workers no route to host

2015-03-31 Thread ZhuGe
Hi,i set up a standalone cluster of 5 machines(tmaster, tslave1,2,3,4) with 
spark-1.3.0-cdh5.4.0-snapshort. when i execute the sbin/start-all.sh, the 
master is ok, but i cant see the web ui. Moreover, the worker logs is something 
like this:
Spark assembly has been built with Hive, including Datanucleus jars on 
classpath/data/PlatformDep/cdh5/dist/bin/compute-classpath.sh: line 164: 
hadoop: command not foundSpark Command: java -cp 
:/data/PlatformDep/cdh5/dist/sbin/../conf:/data/PlatformDep/cdh5/dist/lib/spark-assembly-1.3.0-cdh5.4.0-SNAPSHOT-hadoop2.6.0-cdh5.4.0-SNAPSHOT.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-rdbms-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-api-jdo-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-core-3.2.2.jar:
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m 
org.apache.spark.deploy.worker.Worker spark://192.168.128.16:7071 --webui-port 
8081
Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties15/03/31 06:47:22 INFO Worker: 
Registered signal handlers for [TERM, HUP, INT]15/03/31 06:47:23 WARN 
NativeCodeLoader: Unable to load native-hadoop library for your platform... 
using builtin-java classes where applicable15/03/31 06:47:23 INFO 
SecurityManager: Changing view acls to: dcadmin15/03/31 06:47:23 INFO 
SecurityManager: Changing modify acls to: dcadmin15/03/31 06:47:23 INFO 
SecurityManager: SecurityManager: authentication disabled; ui acls disabled; 
users with view permissions: Set(dcadmin); users with modify permissions: 
Set(dcadmin)15/03/31 06:47:23 INFO Slf4jLogger: Slf4jLogger started15/03/31 
06:47:23 INFO Remoting: Starting remoting15/03/31 06:47:23 INFO Remoting: 
Remoting started; listening on addresses 
:[akka.tcp://sparkWorker@tslave2:60815]15/03/31 06:47:24 INFO Utils: 
Successfully started service 'sparkWorker' on port 60815.15/03/31 06:47:24 INFO 
Worker: Starting Spark worker tslave2:60815 with 2 cores, 3.0 GB RAM15/03/31 
06:47:24 INFO Worker: Running Spark version 1.3.015/03/31 06:47:24 INFO Worker: 
Spark home: /data/PlatformDep/cdh5/dist15/03/31 06:47:24 INFO Server: 
jetty-8.y.z-SNAPSHOT15/03/31 06:47:24 INFO AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:808115/03/31 06:47:24 INFO Utils: Successfully 
started service 'WorkerUI' on port 8081.15/03/31 06:47:24 INFO WorkerWebUI: 
Started WorkerWebUI at http://tslave2:808115/03/31 06:47:24 INFO Worker: 
Connecting to master 
akka.tcp://sparkMaster@192.168.128.16:7071/user/Master...15/03/31 06:47:24 
ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] 
-> [akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]] 
[akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No 
route to host]15/03/31 06:47:24 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@tslave2:60815] -> 
[akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]] 
[akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No 
route to host]15/03/31 06:47:24 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@tslave2:60815] -> 
[akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]] 
[akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No 
route to host]15/03/31 06:47:24 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@tslave2:60815] -> 
[akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]] 
[akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]


the worker machines ping the master machine successfully. the hosts is like 
this:192.168.128.16 tmaster tmaster192.168.128.17 tslave1 tslave1192.168.128.18 
tslave2 tslave2192.168.128.19 tslave3 tslave3192.168.128.20 tslave4 tslave4
Hope someone could help. Thanks   

Re: Parquet Hive table become very slow on 1.3?

2015-03-31 Thread Cheng Lian

Hi Xudong,

This is probably because of Parquet schema merging is turned on by 
default. This is generally useful for Parquet files with different but 
compatible schemas. But it needs to read metadata from all Parquet 
part-files. This can be problematic when reading Parquet files with lots 
of part-files, especially when the user doesn't need schema merging.


This issue is tracked by SPARK-6575, and here is a PR for it: 
https://github.com/apache/spark/pull/5231. This PR adds a configuration 
to disable schema merging by default when doing Hive metastore Parquet 
table conversion.


Another workaround is to fallback to the old Parquet code by setting 
spark.sql.parquet.useDataSourceApi to false.


Cheng

On 3/31/15 2:47 PM, Zheng, Xudong wrote:

Hi all,

We are using Parquet Hive table, and we are upgrading to Spark 1.3. 
But we find that, just a simple COUNT(*) query will much slower (100x) 
than Spark 1.2.


I find the most time spent on driver to get HDFS blocks. I find large 
amount of get below logs printed:


15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
   fileLength=77153436
   underConstruction=false
   blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; 
getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010  
,10.152.116.169:50010  
, 10.153.125.184:50010]}]
   
lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; 
getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.169:50010  
,10.153.125.184:50010  
,10.152.116.172:50010  ]}
   isLastBlockComplete=true}
15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode10.152.116.172:50010  


I compare the printed log with Spark 1.2, although the number of 
getBlockLocations call is similar, but each such operation only cost 
20~30 ms (but it is 2000ms~3000ms now), and it didn't print the 
detailed LocatedBlocks info.


Another finding is, if I read the Parquet file via scala code form 
spark-shell as below, it looks fine, the computation will return the 
result quick as before.


|sqlContext.parquetFile("data/myparquettable")|

Any idea about it? Thank you!


--
郑旭东
Zheng, Xudong





Re: log4j.properties in jar

2015-03-31 Thread Emre Sevinc
Hello Udit,

Yes, what you ask is possible. If you follow the Spark documentation and
tutorial about how to build stand-alone applications, you can see that it
is possible to build a stand-alone, über-JAR file that includes everything.

For example, if you want to suppress some messages by modifying log4j in
unit tests, you can do the following:
http://stackoverflow.com/questions/27248997/how-to-suppress-spark-logging-in-unit-tests/2736#2736

Hope this helps.

--
Emre Sevinç
http://www.bigindustries.be/


On Mon, Mar 30, 2015 at 10:24 PM, Udit Mehta  wrote:

> Hi,
>
>
> Is it possible to put the log4j.properties in the application jar such
> that the driver and the executors use this log4j file. Do I need to specify
> anything while submitting my app so that this file is used?
>
> Thanks,
> Udit
>



-- 
Emre Sevinc


Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-31 Thread Jaonary Rabarisoa
Following your suggestion, I end up with the following implementation :







*override def transform(dataSet: DataFrame, paramMap: ParamMap):
DataFrame = {  val schema = transformSchema(dataSet.schema, paramMap,
logging = true)  val map = this.paramMap ++ paramMap*













*val features = dataSet.select(map(inputCol)).mapPartitions { rows =>
  Caffe.set_mode(Caffe.CPU)val net =
CaffeUtils.floatTestNetwork(SparkFiles.get(topology),
SparkFiles.get(weight))val inputBlobs: FloatBlobVector =
net.input_blobs()val N: Int = 1val K: Int =
inputBlobs.get(0).channels()val H: Int =
inputBlobs.get(0).height()val W: Int = inputBlobs.get(0).width()
 inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new
FloatPointer(N*K*W*H)*
val inputCPUData = inputBlobs.get(0).mutable_cpu_data()

val feat = rows.map { case Row(a: Iterable[Float])=>
  dataBlob.put(a.toArray, 0, a.size)
  caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
  val resultBlobs: FloatBlobVector = net.ForwardPrefilled()























*  val resultDim = resultBlobs.get(0).channels()
logInfo(s"Output dimension $resultDim")  val resultBlobData =
resultBlobs.get(0).cpu_data()  val output = new
Array[Float](resultDim)  resultBlobData.get(output)
Vectors.dense(output.map(_.toDouble))}//net.deallocate()
feat  }  val newRowData = dataSet.rdd.zip(features).map { case (old,
feat)=>val oldSeq = old.toSeq  Row.fromSeq(oldSeq :+ feat)  }
dataSet.sqlContext.createDataFrame(newRowData, schema)}*


The idea is to mapPartitions of the underlying RDD of the DataFrame and
create a new DataFrame by zipping the results. It seems to work but when I
try to save the RDD I got the following error :

org.apache.spark.mllib.linalg.DenseVector cannot be cast to
org.apache.spark.sql.Row


On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> One workaround could be to convert a DataFrame into a RDD inside the
> transform function and then use mapPartitions/broadcast to work with the
> JNI calls and then convert back to RDD.
>
> Thanks
> Shivaram
>
> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa 
> wrote:
>
>> Dear all,
>>
>> I'm still struggling to make a pre-trained caffe model transformer for
>> dataframe works. The main problem is that creating a caffe model inside the
>> UDF is very slow and consumes memories.
>>
>> Some of you suggest to broadcast the model. The problem with broadcasting
>> is that I use a JNI interface to caffe C++ with javacpp-preset  and it is
>> not serializable.
>>
>> Another possible approach is to use a UDF that can handle a whole
>> partitions instead of just a row in order to minimize the caffe model
>> instantiation.
>>
>> Is there any ideas to solve one of these two issues ?
>>
>>
>>
>> Best,
>>
>> Jao
>>
>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley 
>> wrote:
>>
>>> I see.  I think your best bet is to create the cnnModel on the master
>>> and then serialize it to send to the workers.  If it's big (1M or so), then
>>> you can broadcast it and use the broadcast variable in the UDF.  There is
>>> not a great way to do something equivalent to mapPartitions with UDFs right
>>> now.
>>>
>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa 
>>> wrote:
>>>
 Here is my current implementation with current master version of spark




 *class DeepCNNFeature extends Transformer with HasInputCol with
 HasOutputCol ... {   override def transformSchema(...) { ... }*
 *override def transform(dataSet: DataFrame, paramMap: ParamMap):
 DataFrame = {*

 *  transformSchema(dataSet.schema, paramMap, logging = 
 true)*



 *  val map = this.paramMap ++ paramMap  
 val deepCNNFeature = udf((v: Vector)=> {*

 *  val cnnModel = new CaffeModel *

 *  cnnModel.transform(v)*




 *  } : Vector )  
 dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol*


 * }*
 *}*

 where CaffeModel is a java api to Caffe C++ model.

 The problem here is that for every row it will create a new instance of
 CaffeModel which is inefficient since creating a new model
 means loading a large model file. And it will transform only a single
 row at a time. Or a Caffe network can process a batch of rows efficiently.
 In other words, is it possible to create an UDF that can operatats on a
 partition in order to minimize the creation of a CaffeModel and
 to take advantage of the Caffe network batch processing ?



 On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley 
 wrote:

> I see, thanks for clarifying!
>
> I'd recommend following existing implementations in spark.ml
> transformers.  You'll need to define a

Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
I have update my spark source code to 1.3.1.

the checkpoint works well. 

BUT the shuffle data still could not be delete automatically…the disk usage is 
still 30TB…

I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.

Do you know how to solve my problem?

Sendong Li



> 在 2015年3月31日,上午12:11,Xiangrui Meng  写道:
> 
> setCheckpointInterval was added in the current master and branch-1.3. Please 
> help check whether it works. It will be included in the 1.3.1 and 1.4.0 
> release. -Xiangrui
> 
> On Mon, Mar 30, 2015 at 7:27 AM, lisendong  > wrote:
> hi, xiangrui:
> I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
> the code is :
> https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
>  
> 
> 
> 
> the checkpoint is very important in my situation, because my task will 
> produce 1TB shuffle data in each iteration, it the shuffle data is not 
> deleted in each iteration(using checkpoint()), the task will produce 30TB 
> data…
> 
> 
> So I change the ALS code, and re-compile by myself, but it seems the 
> checkpoint does not take effects, and the task still occupy 30TB disk… ( I 
> only add two lines to the ALS.scala) :
> 
> 
> 
> 
> 
> and the driver’s log seems strange, why the log is printed together...
> 
> 
> thank you very much!
> 
> 
>> 在 2015年2月26日,下午11:33,163 mailto:lisend...@163.com>> 写道:
>> 
>> Thank you very much for your opinion:)
>> 
>> In our case, maybe it 's dangerous to treat un-observed item as negative 
>> interaction(although we could give them small confidence, I think they are 
>> still incredible...)
>> 
>> I will do more experiments and give you feedback:)
>> 
>> Thank you;)
>> 
>> 
>>> 在 2015年2月26日,23:16,Sean Owen >> > 写道:
>>> 
>>> I believe that's right, and is what I was getting at. yes the implicit
>>> formulation ends up implicitly including every possible interaction in
>>> its loss function, even unobserved ones. That could be the difference.
>>> 
>>> This is mostly an academic question though. In practice, you have
>>> click-like data and should be using the implicit version for sure.
>>> 
>>> However you can give negative implicit feedback to the model. You
>>> could consider no-click as a mild, observed, negative interaction.
>>> That is: supply a small negative value for these cases. Unobserved
>>> pairs are not part of the data set. I'd be careful about assuming the
>>> lack of an action carries signal.
>>> 
 On Thu, Feb 26, 2015 at 3:07 PM, 163 >>> > wrote:
 oh my god, I think I understood...
 In my case, there are three kinds of user-item pairs:
 
 Display and click pair(positive pair)
 Display but no-click pair(negative pair)
 No-display pair(unobserved pair)
 
 Explicit ALS only consider the first and the second kinds
 But implicit ALS consider all the three kinds of pair(and consider the 
 third
 kind as the second pair, because their preference value are all zero and
 confidence are all 1)
 
 So the result are different. right?
 
 Could you please give me some advice, which ALS should I use?
 If I use the implicit ALS, how to distinguish the second and the third kind
 of pair:)
 
 My opinion is in my case, I should use explicit ALS ...
 
 Thank you so much
 
 在 2015年2月26日,22:41,Xiangrui Meng >>> > 写道:
 
 Lisen, did you use all m-by-n pairs during training? Implicit model
 penalizes unobserved ratings, while explicit model doesn't. -Xiangrui
 
> On Feb 26, 2015 6:26 AM, "Sean Owen"  > wrote:
> 
> +user
> 
>> On Thu, Feb 26, 2015 at 2:26 PM, Sean Owen > > wrote:
>> 
>> I think I may have it backwards, and that you are correct to keep the 0
>> elements in train() in order to try to reproduce the same result.
>> 
>> The second formulation is called 'weighted regularization' and is used
>> for both implicit and explicit feedback, as far as I can see in the code.
>> 
>> Hm, I'm actually not clear why these would produce different results.
>> Different code paths are used to be sure, but I'm not yet sure why they
>> would give different results.
>> 
>> In general you wouldn't use train() for data like this though, and would
>> never set alpha=0.
>> 
>>> On Thu, Feb 26, 2015 at 2:15 PM, lisendong >> > wrote:
>>> 
>>> I want to confirm the loss function you use (sorry I’m not so familiar
>>> with scala code so I did not understand the source code of mllib)
>>> 
>>> According to the papers :
>>>

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-31 Thread Shivaram Venkataraman
My guess is that the `createDataFrame` call is failing here.  Can you check
if the schema being passed to it includes the column name and type for the
newly being zipped `features` ?

Joseph probably knows this better, but AFAIK the DenseVector here will need
to be marked as a VectorUDT while creating a DataFrame column

Thanks
Shivaram

On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa 
wrote:

> Following your suggestion, I end up with the following implementation :
>
>
>
>
>
>
>
> *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = 
> {  val schema = transformSchema(dataSet.schema, paramMap, logging = true)  
> val map = this.paramMap ++ paramMap*
>
>
>
>
>
>
>
>
>
>
>
>
>
> *val features = dataSet.select(map(inputCol)).mapPartitions { rows =>
> Caffe.set_mode(Caffe.CPU)val net = 
> CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight)) 
>val inputBlobs: FloatBlobVector = net.input_blobs()val N: Int = 1
> val K: Int = inputBlobs.get(0).channels()val H: Int = 
> inputBlobs.get(0).height()val W: Int = inputBlobs.get(0).width()
> inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new 
> FloatPointer(N*K*W*H)*
> val inputCPUData = inputBlobs.get(0).mutable_cpu_data()
>
> val feat = rows.map { case Row(a: Iterable[Float])=>
>   dataBlob.put(a.toArray, 0, a.size)
>   caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
>   val resultBlobs: FloatBlobVector = net.ForwardPrefilled()
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *  val resultDim = resultBlobs.get(0).channels()  logInfo(s"Output 
> dimension $resultDim")  val resultBlobData = 
> resultBlobs.get(0).cpu_data()  val output = new Array[Float](resultDim)   
>resultBlobData.get(output)  Vectors.dense(output.map(_.toDouble))} 
>//net.deallocate()feat  }  val newRowData = 
> dataSet.rdd.zip(features).map { case (old, feat)=>val oldSeq = old.toSeq  
> Row.fromSeq(oldSeq :+ feat)  }  
> dataSet.sqlContext.createDataFrame(newRowData, schema)}*
>
>
> The idea is to mapPartitions of the underlying RDD of the DataFrame and
> create a new DataFrame by zipping the results. It seems to work but when I
> try to save the RDD I got the following error :
>
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to
> org.apache.spark.sql.Row
>
>
> On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>> One workaround could be to convert a DataFrame into a RDD inside the
>> transform function and then use mapPartitions/broadcast to work with the
>> JNI calls and then convert back to RDD.
>>
>> Thanks
>> Shivaram
>>
>> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa 
>> wrote:
>>
>>> Dear all,
>>>
>>> I'm still struggling to make a pre-trained caffe model transformer for
>>> dataframe works. The main problem is that creating a caffe model inside the
>>> UDF is very slow and consumes memories.
>>>
>>> Some of you suggest to broadcast the model. The problem with
>>> broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
>>>  and it is not serializable.
>>>
>>> Another possible approach is to use a UDF that can handle a whole
>>> partitions instead of just a row in order to minimize the caffe model
>>> instantiation.
>>>
>>> Is there any ideas to solve one of these two issues ?
>>>
>>>
>>>
>>> Best,
>>>
>>> Jao
>>>
>>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley 
>>> wrote:
>>>
 I see.  I think your best bet is to create the cnnModel on the master
 and then serialize it to send to the workers.  If it's big (1M or so), then
 you can broadcast it and use the broadcast variable in the UDF.  There is
 not a great way to do something equivalent to mapPartitions with UDFs right
 now.

 On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa 
 wrote:

> Here is my current implementation with current master version of spark
>
>
>
>
> *class DeepCNNFeature extends Transformer with HasInputCol with
> HasOutputCol ... {   override def transformSchema(...) { ... }*
> *override def transform(dataSet: DataFrame, paramMap: ParamMap):
> DataFrame = {*
>
> *  transformSchema(dataSet.schema, paramMap, logging = 
> true)*
>
>
>
> *  val map = this.paramMap ++ paramMap  
> val deepCNNFeature = udf((v: Vector)=> {*
>
> *  val cnnModel = new CaffeModel *
>
> *  cnnModel.transform(v)*
>
>
>
>
> *  } : Vector )  
> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol*
>
>
> * }*
> *}*
>
> where CaffeModel is a java api to Caffe C++ model.
>
> The problem here is that for every row it will create a new instance
> of CaffeModel which is inefficient s

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-31 Thread Jaonary Rabarisoa
In my transformSchema I do specify that the output column type is a VectorUDT :






*override def transformSchema(schema: StructType, paramMap: ParamMap):
StructType = {  val map = this.paramMap ++ paramMap
checkInputColumn(schema, map(inputCol), ArrayType(FloatType, false))
addOutputColumn(schema, map(outputCol), new VectorUDT)}*


The output of printSchema is as follow :

*|-- cnnFeature: vecto (nullable = false)*



On Tue, Mar 31, 2015 at 9:55 AM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> My guess is that the `createDataFrame` call is failing here.  Can you
> check if the schema being passed to it includes the column name and type
> for the newly being zipped `features` ?
>
> Joseph probably knows this better, but AFAIK the DenseVector here will
> need to be marked as a VectorUDT while creating a DataFrame column
>
> Thanks
> Shivaram
>
> On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa 
> wrote:
>
>> Following your suggestion, I end up with the following implementation :
>>
>>
>>
>>
>>
>>
>>
>> *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = 
>> {  val schema = transformSchema(dataSet.schema, paramMap, logging = true)  
>> val map = this.paramMap ++ paramMap*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *val features = dataSet.select(map(inputCol)).mapPartitions { rows =>
>> Caffe.set_mode(Caffe.CPU)val net = 
>> CaffeUtils.floatTestNetwork(SparkFiles.get(topology), 
>> SparkFiles.get(weight))val inputBlobs: FloatBlobVector = 
>> net.input_blobs()val N: Int = 1val K: Int = 
>> inputBlobs.get(0).channels()val H: Int = inputBlobs.get(0).height()
>> val W: Int = inputBlobs.get(0).width()inputBlobs.get(0).Reshape(N, K, H, 
>> W)val dataBlob = new FloatPointer(N*K*W*H)*
>> val inputCPUData = inputBlobs.get(0).mutable_cpu_data()
>>
>> val feat = rows.map { case Row(a: Iterable[Float])=>
>>   dataBlob.put(a.toArray, 0, a.size)
>>   caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
>>   val resultBlobs: FloatBlobVector = net.ForwardPrefilled()
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *  val resultDim = resultBlobs.get(0).channels()  logInfo(s"Output 
>> dimension $resultDim")  val resultBlobData = 
>> resultBlobs.get(0).cpu_data()  val output = new Array[Float](resultDim)  
>> resultBlobData.get(output)  Vectors.dense(output.map(_.toDouble))
>> }//net.deallocate()feat  }  val newRowData = 
>> dataSet.rdd.zip(features).map { case (old, feat)=>val oldSeq = old.toSeq 
>>  Row.fromSeq(oldSeq :+ feat)  }  
>> dataSet.sqlContext.createDataFrame(newRowData, schema)}*
>>
>>
>> The idea is to mapPartitions of the underlying RDD of the DataFrame and
>> create a new DataFrame by zipping the results. It seems to work but when I
>> try to save the RDD I got the following error :
>>
>> org.apache.spark.mllib.linalg.DenseVector cannot be cast to
>> org.apache.spark.sql.Row
>>
>>
>> On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>> One workaround could be to convert a DataFrame into a RDD inside the
>>> transform function and then use mapPartitions/broadcast to work with the
>>> JNI calls and then convert back to RDD.
>>>
>>> Thanks
>>> Shivaram
>>>
>>> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa 
>>> wrote:
>>>
 Dear all,

 I'm still struggling to make a pre-trained caffe model transformer for
 dataframe works. The main problem is that creating a caffe model inside the
 UDF is very slow and consumes memories.

 Some of you suggest to broadcast the model. The problem with
 broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
  and it is not serializable.

 Another possible approach is to use a UDF that can handle a whole
 partitions instead of just a row in order to minimize the caffe model
 instantiation.

 Is there any ideas to solve one of these two issues ?



 Best,

 Jao

 On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley 
 wrote:

> I see.  I think your best bet is to create the cnnModel on the master
> and then serialize it to send to the workers.  If it's big (1M or so), 
> then
> you can broadcast it and use the broadcast variable in the UDF.  There is
> not a great way to do something equivalent to mapPartitions with UDFs 
> right
> now.
>
> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa 
> wrote:
>
>> Here is my current implementation with current master version of
>> spark
>>
>>
>>
>>
>> *class DeepCNNFeature extends Transformer with HasInputCol with
>> HasOutputCol ... {   override def transformSchema(...) { ... }*
>> *override def transform(dataSet: DataFrame, paramMap: ParamMap):
>> DataFrame = {*
>>
>> *  transformSchema(dataSet.schema, par

Re: JettyUtils.createServletHandler Method not Found?

2015-03-31 Thread kmader
Yes, this private is checked at compile time and my class is in a subpackage
of org.apache.spark.ui, so the visibility is not the issue, or at least not
as far as I can tell.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JettyUtils-createServletHandler-Method-not-Found-tp22262p22313.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error in Delete Table

2015-03-31 Thread Ted Yu
Which Spark and Hive release are you using ?

Thanks



> On Mar 27, 2015, at 2:45 AM, Masf  wrote:
> 
> Hi.
> 
> In HiveContext, when I put this statement "DROP TABLE IF EXISTS TestTable"
> If TestTable doesn't exist, spark returns an error:
> 
> 
> 
> ERROR Hive: NoSuchObjectException(message:default.TestTable table not found)
>   at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29338)
>   at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29306)
>   at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result.read(ThriftHiveMetastore.java:29237)
>   at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
>   at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1036)
>   at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1022)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90)
>   at com.sun.proxy.$Proxy22.getTable(Unknown Source)
>   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000)
>   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:942)
>   at 
> org.apache.hadoop.hive.ql.exec.DDLTask.dropTableOrPartitions(DDLTask.java:3887)
>   at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:310)
>   at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
>   at 
> org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
>   at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1554)
>   at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1321)
>   at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1139)
>   at org.apache.hadoop.hive.ql.Driver.run(Driver.java:962)
>   at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952)
>   at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
>   at 
> org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
>   at 
> org.apache.spark.sql.hive.execution.DropTable.sideEffectResult$lzycompute(commands.scala:58)
>   at 
> org.apache.spark.sql.hive.execution.DropTable.sideEffectResult(commands.scala:56)
>   at 
> org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
>   at 
> org.apache.spark.sql.hive.execution.DropTable.execute(commands.scala:51)
>   at 
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
>   at 
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
>   at 
> org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
>   at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
>   at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
>   at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98)
>   at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at GeoMain$.HiveExecution(GeoMain.scala:96)
>   at GeoMain$.main(GeoMain.scala:17)
>   at GeoMain.main(GeoMain.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 
> 
> Thanks!!
> -- 
> 
> 
> Regards.
> Miguel Ángel

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error in Delete Table

2015-03-31 Thread Masf
Hi Ted.

Spark 1.2.0 an Hive 0.13.1

Regards.
Miguel Angel.


On Tue, Mar 31, 2015 at 10:37 AM, Ted Yu  wrote:

> Which Spark and Hive release are you using ?
>
> Thanks
>
>
>
> > On Mar 27, 2015, at 2:45 AM, Masf  wrote:
> >
> > Hi.
> >
> > In HiveContext, when I put this statement "DROP TABLE IF EXISTS
> TestTable"
> > If TestTable doesn't exist, spark returns an error:
> >
> >
> >
> > ERROR Hive: NoSuchObjectException(message:default.TestTable table not
> found)
> >   at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29338)
> >   at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29306)
> >   at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result.read(ThriftHiveMetastore.java:29237)
> >   at
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
> >   at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1036)
> >   at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1022)
> >   at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008)
> >   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >   at java.lang.reflect.Method.invoke(Method.java:606)
> >   at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90)
> >   at com.sun.proxy.$Proxy22.getTable(Unknown Source)
> >   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000)
> >   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:942)
> >   at
> org.apache.hadoop.hive.ql.exec.DDLTask.dropTableOrPartitions(DDLTask.java:3887)
> >   at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:310)
> >   at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
> >   at
> org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
> >   at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1554)
> >   at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1321)
> >   at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1139)
> >   at org.apache.hadoop.hive.ql.Driver.run(Driver.java:962)
> >   at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952)
> >   at
> org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
> >   at
> org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
> >   at
> org.apache.spark.sql.hive.execution.DropTable.sideEffectResult$lzycompute(commands.scala:58)
> >   at
> org.apache.spark.sql.hive.execution.DropTable.sideEffectResult(commands.scala:56)
> >   at
> org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
> >   at
> org.apache.spark.sql.hive.execution.DropTable.execute(commands.scala:51)
> >   at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
> >   at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
> >   at
> org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
> >   at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
> >   at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
> >   at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98)
> >   at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98)
> >   at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >   at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >   at GeoMain$.HiveExecution(GeoMain.scala:96)
> >   at GeoMain$.main(GeoMain.scala:17)
> >   at GeoMain.main(GeoMain.scala)
> >   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >   at java.lang.reflect.Method.invoke(Method.java:606)
> >   at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> >   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> >   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >
> >
> > Thanks!!
> > --
> >
> >
> > Regards.
> > Miguel Ángel
>



-- 


Saludos.
Miguel Ángel


Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
Thank you, @GuoQiang
I will try to add runGC() to the ALS.scala, and if it works for deleting the 
shuffle data, I will tell you :-)



> ?? 2015??3??314:47??GuoQiang Li  ??
> 
> You can try to enforce garbage collection:
> 
> /** Run GC and make sure it actually has run */
> def runGC() {
>   val weakRef = new WeakReference(new Object())
>   val startTime = System.currentTimeMillis
>   System.gc() // Make a best effort to run the garbage collection. It 
> *usually* runs GC.
>   // Wait until a weak reference object has been GCed
>   System.runFinalization()
>   while (weakRef.get != null) {
> System.gc()
> System.runFinalization()
> Thread.sleep(200)
> if (System.currentTimeMillis - startTime > 1) {
>   throw new Exception("automatically cleanup error")
> }
>   }
> }
> 
> 
> --  --
> ??: "lisendong"mailto:lisend...@163.com>>; 
> : 2015??3??31??(??) 3:47
> ??: "Xiangrui Meng"mailto:men...@gmail.com>>; 
> : "Xiangrui Meng"mailto:m...@databricks.com>>; 
> "user"mailto:user@spark.apache.org>>; "Sean 
> Owen"mailto:so...@cloudera.com>>; "GuoQiang 
> Li"mailto:wi...@qq.com>>; 
> : Re: different result from implicit ALS with explicit ALS
> 
> I have update my spark source code to 1.3.1.
> 
> the checkpoint works well. 
> 
> BUT the shuffle data still could not be delete automatically??the disk usage 
> is still 30TB??
> 
> I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
> 
> Do you know how to solve my problem?
> 
> Sendong Li
> 
> 
> 
>> ?? 2015??3??3112:11??Xiangrui Meng > > ??
>> 
>> setCheckpointInterval was added in the current master and branch-1.3. Please 
>> help check whether it works. It will be included in the 1.3.1 and 1.4.0 
>> release. -Xiangrui
>> 
>> On Mon, Mar 30, 2015 at 7:27 AM, lisendong > > wrote:
>> hi, xiangrui:
>> I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
>> the code is :
>> https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
>>  
>> 
>> 
>> 
>> the checkpoint is very important in my situation, because my task will 
>> produce 1TB shuffle data in each iteration, it the shuffle data is not 
>> deleted in each iteration(using checkpoint()), the task will produce 30TB 
>> data??
>> 
>> 
>> So I change the ALS code, and re-compile by myself, but it seems the 
>> checkpoint does not take effects, and the task still occupy 30TB disk?? ( I 
>> only add two lines to the ALS.scala) :
>> 
>> 
>> 
>> 
>> 
>> and the driver??s log seems strange, why the log is printed together...
>> 
>> 
>> thank you very much!
>> 
>> 
>>> ?? 2015??2??2611:33??163 >> > ??
>>> 
>>> Thank you very much for your opinion:)
>>> 
>>> In our case, maybe it 's dangerous to treat un-observed item as negative 
>>> interaction(although we could give them small confidence, I think they are 
>>> still incredible...)
>>> 
>>> I will do more experiments and give you feedback:)
>>> 
>>> Thank you;)
>>> 
>>> 
 ?? 2015??2??2623:16??Sean Owen >>> > ??
 
 I believe that's right, and is what I was getting at. yes the implicit
 formulation ends up implicitly including every possible interaction in
 its loss function, even unobserved ones. That could be the difference.
 
 This is mostly an academic question though. In practice, you have
 click-like data and should be using the implicit version for sure.
 
 However you can give negative implicit feedback to the model. You
 could consider no-click as a mild, observed, negative interaction.
 That is: supply a small negative value for these cases. Unobserved
 pairs are not part of the data set. I'd be careful about assuming the
 lack of an action carries signal.
 
> On Thu, Feb 26, 2015 at 3:07 PM, 163  > wrote:
> oh my god, I think I understood...
> In my case, there are three kinds of user-item pairs:
> 
> Display and click pair(positive pair)
> Display but no-click pair(negative pair)
> No-display pair(unobserved pair)
> 
> Explicit ALS only consider the first and the second kinds
> But implicit ALS consider all the three kinds of pair(and consider the 
> third
> kind as the second pair, because their preference value are all zero and
> confidence are all 1)
> 
> So the result are different. right?
> 
> Could you please give me some advice, which ALS should I use?
> If I use the implicit ALS, how to distinguish the second and the third 
> kind
> of pair:)
> 
> My opinion is in my cas

Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-31 Thread Nicolas Phung
Hello,

@Akhil Das I'm trying to use the experimental API
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
.
I'm reusing the same code snippet to initialize my topicSet.

@Cody Koeninger I don't see any previous error messages (see the full log
at the end). To create the topic, I'm doing the following :

"kafka-topics --create --zookeeper localhost:2181 --replication-factor
1 --partitions 10 --topic toto"

"kafka-topics --create --zookeeper localhost:2181 --replication-factor
1 --partitions 1 --topic toto-single"

I'm launching my Spark Streaming in local mode.

@Ted Yu There's no log "Couldn't connect to leader for topic", here's the
full version :

"spark-submit --conf config.resource=application-integration.conf --class
nextgen.Main assembly-0.1-SNAPSHOT.jar

15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung
15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung
15/03/31 10:47:12 INFO SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(nphung); users with modify permissions: Set(nphung)
15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started
15/03/31 10:47:13 INFO Remoting: Starting remoting
15/03/31 10:47:13 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkDriver@int.local:44180]
15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@int.local:44180]
15/03/31 10:47:13 INFO Utils: Successfully started service
'sparkDriver' on port 44180.
15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker
15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster
15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20150331104713-2238
15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53
15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server
15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file
server' on port 50204.
15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI'
on port 4040.
15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040
15/03/31 10:47:16 INFO SparkContext: Added JAR
file:/home/nphung/assembly-0.1-SNAPSHOT.jar at
http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with
timestamp 1427791636151
15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver
15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630
15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager
15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block
manager localhost:40630 with 265.1 MB RAM, BlockManagerId(,
localhost, 40630)
15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager
15/03/31 10:47:17 INFO EventLoggingListener: Logging events to
hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195
15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties
15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden to
15/03/31 10:47:17 INFO VerifiableProperties: Property
zookeeper.connect is overridden to
15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and
validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44
15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
15/03/31 10:47:17 INFO MappedDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
15/03/31 10:47:17 INFO MappedDStream: Initialized and validated
org.apache.spark.streaming.dstream.MappedDStream@7fd8c559
15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
15/03/31 10:47:17 INFO MappedDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/31 10:47:17 INFO MappedDStream: Checkpoint inte

Broadcasting a parquet file using spark and python

2015-03-31 Thread jitesh129
How can we implement a BroadcastHashJoin for spark with python?

My SparkSQL inner joins are taking a lot of time since it is performing
ShuffledHashJoin.

Tables on which join is performed are stored as parquet files.

Please help.

Thanks and regards,
Jitesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to save dataframe with UDT created with sqlContext.createDataFrame

2015-03-31 Thread Jaonary Rabarisoa
Hi all,

DataFrame with an user defined type (here mllib.Vector) created with
sqlContex.createDataFrame can't be saved to parquet file and raise
ClassCastException:
org.apache.spark.mllib.linalg.DenseVector cannot be cast to
org.apache.spark.sql.Row error.

Here is an example of code to reproduce this error :






















*object TestDataFrame {  def main(args: Array[String]): Unit = {
//System.loadLibrary(Core.NATIVE_LIBRARY_NAME)val conf = new
SparkConf().setAppName("RankingEval").setMaster("local[8]")
.set("spark.executor.memory", "6g")val sc = new SparkContext(conf)
   val sqlContext = new SQLContext(sc)import
sqlContext.implicits._val data =
sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10val dataDF
= data.toDFdataDF.save("test1.parquet")val dataDF2 =
sqlContext.createDataFrame(dataDF.rdd, dataDF.schema)
dataDF2.save("test2.parquet")  }}*


Is this related to https://issues.apache.org/jira/browse/SPARK-5532
and how can it be solved ?


Cheers,


Jao


Re: spark there is no space on the disk

2015-03-31 Thread Peng Xia
Yes, we have just modified the configuration, and every thing works fine.
Thanks very much for the help.

On Thu, Mar 19, 2015 at 5:24 PM, Ted Yu  wrote:

> For YARN, possibly this one ?
>
> 
>   yarn.nodemanager.local-dirs
>   /hadoop/yarn/local
> 
>
> Cheers
>
> On Thu, Mar 19, 2015 at 2:21 PM, Marcelo Vanzin 
> wrote:
>
>> IIRC you have to set that configuration on the Worker processes (for
>> standalone). The app can't override it (only for a client-mode
>> driver). YARN has a similar configuration, but I don't know the name
>> (shouldn't be hard to find, though).
>>
>> On Thu, Mar 19, 2015 at 11:56 AM, Davies Liu 
>> wrote:
>> > Is it possible that `spark.local.dir` is overriden by others? The docs
>> say:
>> >
>> > NOTE: In Spark 1.0 and later this will be overriden by
>> > SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN)
>> >
>> > On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia 
>> wrote:
>> >> Hi Sean,
>> >>
>> >> Thank very much for your reply.
>> >> I tried to config it from below code:
>> >>
>> >> sf = SparkConf().setAppName("test").set("spark.executor.memory",
>> >> "45g").set("spark.cores.max", 62),set("spark.local.dir", "C:\\tmp")
>> >>
>> >> But still get the error.
>> >> Do you know how I can config this?
>> >>
>> >>
>> >> Thanks,
>> >> Best,
>> >> Peng
>> >>
>> >>
>> >> On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen  wrote:
>> >>>
>> >>> It means pretty much what it says. You ran out of space on an executor
>> >>> (not driver), because the dir used for serialization temp files is
>> >>> full (not all volumes). Set spark.local.dirs to something more
>> >>> appropriate and larger.
>> >>>
>> >>> On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia 
>> wrote:
>> >>> > Hi
>> >>> >
>> >>> >
>> >>> > I was running a logistic regression algorithm on a 8 nodes spark
>> >>> > cluster,
>> >>> > each node has 8 cores and 56 GB Ram (each node is running a windows
>> >>> > system).
>> >>> > And the spark installation driver has 1.9 TB capacity. The dataset
>> I was
>> >>> > training on are has around 40 million records with around 6600
>> features.
>> >>> > But
>> >>> > I always get this error during the training process:
>> >>> >
>> >>> > Py4JJavaError: An error occurred while calling
>> >>> > o70.trainLogisticRegressionModelWithLBFGS.
>> >>> > : org.apache.spark.SparkException: Job aborted due to stage failure:
>> >>> > Task
>> >>> > 2709 in stage 3.0 failed 4 times, most recent failure: Lost task
>> 2709.3
>> >>> > in
>> >>> > stage 3.0 (TID 2766,
>> >>> > workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
>> >>> > java.io.IOException: There is not enough space on the disk
>> >>> > at java.io.FileOutputStream.writeBytes(Native Method)
>> >>> > at java.io.FileOutputStream.write(FileOutputStream.java:345)
>> >>> > at
>> >>> > java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
>> >>> > at
>> >>> >
>> >>> >
>> org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
>> >>> > at
>> >>> >
>> >>> >
>> org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
>> >>> > at
>> >>> >
>> org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
>> >>> > at
>> >>> >
>> >>> >
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>> >>> > at
>> >>> >
>> >>> >
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
>> >>> > at
>> >>> >
>> >>> >
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
>> >>> > at
>> >>> > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>> >>> > at
>> >>> >
>> >>> >
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>> >>> > at
>> >>> >
>> >>> >
>> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
>> >>> > at
>> >>> >
>> >>> >
>> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
>> >>> > at
>> >>> > org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
>> >>> > at
>> >>> > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
>> >>> > at
>> >>> >
>> >>> >
>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
>> >>> > at
>> >>> >
>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
>> >>> > at
>> >>> > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> >>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
>> >>> > at
>> >>> > org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
>> >>> > at
>> >>> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
>> >>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>> >>> > at
>> >>> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> 

refer to dictionary

2015-03-31 Thread Peng Xia
Hi,

I have a RDD (rdd1)where each line is split into an array ["a", "b", "c],
etc.
And I also have a local dictionary p (dict1) stores key value pair {"a":1,
"b": 2, c:3}
I want to replace the keys in the rdd with the its corresponding value in
the dict:
rdd1.map(lambda line: [dict1[item] for item in line])

But this task is not distributed, I believe the reason is the dict1 is a
local instance.
Can any one provide suggestions on this to parallelize this?


Thanks,
Best,
Peng


Re: Can't run spark-submit with an application jar on a Mesos cluster

2015-03-31 Thread hbogert
Well that are only the logs of the slaves on mesos level,  I'm not sure from
your reply if you can ssh into a specific slave or not, if you can, you
should  look at actual output of the application (spark in this case) on a
slave in e.g.
 
/tmp/mesos/slaves/20150322-040336-606645514-5050-2744-S1/frameworks/20150322-040336-606645514-5050-2744-0037/executors/4/runs/e3cf195d-525b-4148-aa38-1789d378a948/std{err,out}

actual UUIDs, run number (in this example '4') in the path can differ from
slave-node to slave-node.

look into those stderr and stdout files and you'll probably have your answer
why it is failing.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-run-spark-submit-with-an-application-jar-on-a-Mesos-cluster-tp22277p22319.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-03-31 Thread Sean Owen
I had always understood the formulation to be the first option you
describe. Lambda is scaled by the number of items the user has rated /
interacted with. I think the goal is to avoid fitting the tastes of
prolific users disproportionately just because they have many ratings
to fit. This is what's described in the ALS-WR paper we link to on the
Spark web site, in equation 5
(http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf)

I think this also gets you the scale-invariance? For every additional
rating from user i to product j, you add one new term to the
squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the
regularization term by lambda * (|u_i|^2 + |m_j|^2)  They are at least
both increasing about linearly as ratings increase. If the
regularization term is multiplied by the total number of users and
products in the model, then it's fixed.

I might misunderstand you and/or be speaking about something slightly
different when it comes to invariance. But FWIW I had always
understood the regularization to be multiplied by the number of
explicit ratings.

On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng  wrote:
> Okay, I didn't realize that I changed the behavior of lambda in 1.3.
> to make it "scale-invariant", but it is worth discussing whether this
> is a good change. In 1.2, we multiply lambda by the number ratings in
> each sub-problem. This makes it "scale-invariant" for explicit
> feedback. However, in implicit feedback model, a user's sub-problem
> contains all item factors. Then the question is whether we should
> multiply lambda by the number of explicit ratings from this user or by
> the total number of items. We used the former in 1.2 but changed to
> the latter in 1.3. So you should try a smaller lambda to get a similar
> result in 1.3.
>
> Sean and Shuo, which approach do you prefer? Do you know any existing
> work discussing this?
>
> Best,
> Xiangrui
>
>
> On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng  wrote:
>> This sounds like a bug ... Did you try a different lambda? It would be
>> great if you can share your dataset or re-produce this issue on the
>> public dataset. Thanks! -Xiangrui
>>
>> On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody  wrote:
>>> After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly
>>> smaller factors (and hence scores). For example, the first few product's
>>> factor values in 1.2.0 are (0.04821, -0.00674,  -0.0325). In 1.3.0, the
>>> first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This
>>> difference of several orders of magnitude is consistent throughout both user
>>> and product. The recommendations from 1.2.0 are subjectively much better
>>> than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less
>>> memory.
>>>
>>> My first thought is that there is too much regularization in the 1.3.0
>>> results, but I'm using the same lambda parameter value. This is a snippet of
>>> my scala code:
>>> .
>>> val rank = 75
>>> val numIterations = 15
>>> val alpha = 10
>>> val lambda = 0.01
>>> val model = ALS.trainImplicit(train_data, rank, numIterations,
>>> lambda=lambda, alpha=alpha)
>>> .
>>>
>>> The code and input data are identical across both versions. Did anything
>>> change between the two versions I'm not aware of? I'd appreciate any help!
>>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: refer to dictionary

2015-03-31 Thread Ted Yu
You can use broadcast variable. 

See also this thread:
http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variable&subj=How+Broadcast+variable+scale+



> On Mar 31, 2015, at 4:43 AM, Peng Xia  wrote:
> 
> Hi,
> 
> I have a RDD (rdd1)where each line is split into an array ["a", "b", "c], etc.
> And I also have a local dictionary p (dict1) stores key value pair {"a":1, 
> "b": 2, c:3}
> I want to replace the keys in the rdd with the its corresponding value in the 
> dict:
> rdd1.map(lambda line: [dict1[item] for item in line])
> 
> But this task is not distributed, I believe the reason is the dict1 is a 
> local instance.
> Can any one provide suggestions on this to parallelize this?
> 
> 
> Thanks,
> Best,
> Peng
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: can't union two rdds

2015-03-31 Thread roy
use zip



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-t-union-two-rdds-tp22320p22321.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark sql query fails with executor lost/ out of memory expection while caching a table

2015-03-31 Thread ankurjain.nitrr
Hi,

I am using spark 1.2.1

I am using thrift server to query my data.


while executing query "CACHE TABLE tablename"

Fails with exception

Error: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 10.0 (TID 41, bbr-dev178): Execu
torLostFailure (executor 12 lost)

and sometimes

Error: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 8.0 (TID 33, bbr-dev178): java.la
ng.OutOfMemoryError: Java heap space


I understand that my executors are going out of memory during the caching
and therefore getting killed.

My question is.. 

Is there a way to make the thirft server spill the data to disk if it is not
able keep the entire dataset in memory?
Can i change the Storage Level for spark sql thrift server for caching?

I don't want my executors to get lost and cache queries to get failed.








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-query-fails-with-executor-lost-out-of-memory-expection-while-caching-a-table-tp22322.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: can't union two rdds

2015-03-31 Thread ankurjain.nitrr
Rdd union will result in  

  1 2 
  3 4 
  5 6 
  7 8 
  9 10 
11 12

What you are trying to do is join.
There must be a logic/key to perform join operation.

I think in your case you want the order (index) to be the joining key here.
RDD is a distributed data structure and is not apt for your case.

If that amount for data is less, you can use rdd.collect, just iterate on it
both the list and produce the desired result



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-t-union-two-rdds-tp22320p22323.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



"Ambiguous references" to a field set in a partitioned table AND the data

2015-03-31 Thread Nicolas Fouché
  Hi,


I save Parquet files in a partitioned table, so in /path/to/table/myfield=a/ .
But I also kept the field "myfield" in the Parquet data. Thus. when I query the 
field, I get this error:


df.select("myfield").show(10)
"Exception in thread "main" org.apache.spark.sql.AnalysisException: Ambiguous 
references to myfield  (myfield#2,List()),(myfield#47,List());"


Looking at the code, I could not find a way to explicitly specify which column 
I'd want. DataFrame#columns returns strings. Even by loading the data with a 
schema (StructType), I'm not sure I can do it.


Should I have to make sure that my partition field does not exist in the data 
before saving ? Or is there a way to declare what column in the schema I want 
to query ?


Thanks.






Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-31 Thread Ted Yu
Can you show us the output of DStream#print() if you have it ?

Thanks

On Tue, Mar 31, 2015 at 2:55 AM, Nicolas Phung 
wrote:

> Hello,
>
> @Akhil Das I'm trying to use the experimental API
> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
> .
> I'm reusing the same code snippet to initialize my topicSet.
>
> @Cody Koeninger I don't see any previous error messages (see the full log
> at the end). To create the topic, I'm doing the following :
>
> "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 
> --partitions 10 --topic toto"
>
> "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 
> --partitions 1 --topic toto-single"
>
> I'm launching my Spark Streaming in local mode.
>
> @Ted Yu There's no log "Couldn't connect to leader for topic", here's the
> full version :
>
> "spark-submit --conf config.resource=application-integration.conf --class
> nextgen.Main assembly-0.1-SNAPSHOT.jar
>
> 15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung
> 15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung
> 15/03/31 10:47:12 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(nphung); users 
> with modify permissions: Set(nphung)
> 15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started
> 15/03/31 10:47:13 INFO Remoting: Starting remoting
> 15/03/31 10:47:13 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://sparkDriver@int.local:44180]
> 15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses: 
> [akka.tcp://sparkDriver@int.local:44180]
> 15/03/31 10:47:13 INFO Utils: Successfully started service 'sparkDriver' on 
> port 44180.
> 15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker
> 15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster
> 15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at 
> /tmp/spark-local-20150331104713-2238
> 15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
> 15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is 
> /tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53
> 15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server
> 15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file server' 
> on port 50204.
> 15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040
> 15/03/31 10:47:16 INFO SparkContext: Added JAR 
> file:/home/nphung/assembly-0.1-SNAPSHOT.jar at 
> http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp 
> 1427791636151
> 15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver: 
> akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver
> 15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630
> 15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager
> 15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block manager 
> localhost:40630 with 265.1 MB RAM, BlockManagerId(, localhost, 40630)
> 15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager
> 15/03/31 10:47:17 INFO EventLoggingListener: Logging events to 
> hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195
> 15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties
> 15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden 
> to
> 15/03/31 10:47:17 INFO VerifiableProperties: Property zookeeper.connect is 
> overridden to
> 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = 
> StorageLevel(false, false, false, false, 1)
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated 
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44
> 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 IN

Re: Anyone has some simple example with spark-sql with spark 1.3

2015-03-31 Thread Vincent He
It works,thanks for your great help.

On Mon, Mar 30, 2015 at 10:07 PM, Denny Lee  wrote:

> Hi Vincent,
>
> This may be a case that you're missing a semi-colon after your CREATE
> TEMPORARY TABLE statement.  I ran your original statement (missing the
> semi-colon) and got the same error as you did.  As soon as I added it in, I
> was good to go again:
>
> CREATE TEMPORARY TABLE jsonTable
> USING org.apache.spark.sql.json
> OPTIONS (
>   path "/samples/people.json"
> );
> -- above needed a semi-colon so the temporary table could be created first
> SELECT * FROM jsonTable;
>
> HTH!
> Denny
>
>
> On Sun, Mar 29, 2015 at 6:59 AM Vincent He 
> wrote:
>
>> No luck, it does not work, anyone know whether there some special setting
>> for spark-sql cli so we do not need to write code to use spark sql? Anyone
>> have some simple example on this? appreciate any help. thanks in advance.
>>
>> On Sat, Mar 28, 2015 at 9:05 AM, Ted Yu  wrote:
>>
>>> See
>>> https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html
>>>
>>> I haven't tried the SQL statements in above blog myself.
>>>
>>> Cheers
>>>
>>> On Sat, Mar 28, 2015 at 5:39 AM, Vincent He <
>>> vincent.he.andr...@gmail.com> wrote:
>>>
 thanks for your information . I have read it, I can run sample with
 scala or python, but for spark-sql shell, I can not get an exmaple running
 successfully, can you give me an example I can run with "./bin/spark-sql"
 without writing any code? thanks

 On Sat, Mar 28, 2015 at 7:35 AM, Ted Yu  wrote:

> Please take a look at
> https://spark.apache.org/docs/latest/sql-programming-guide.html
>
> Cheers
>
>
>
> > On Mar 28, 2015, at 5:08 AM, Vincent He <
> vincent.he.andr...@gmail.com> wrote:
> >
> >
> > I am learning spark sql and try spark-sql example,  I running
> following code, but I got exception "ERROR CliDriver:
> org.apache.spark.sql.AnalysisException: cannot recognize input near
> 'CREATE' 'TEMPORARY' 'TABLE' in ddl statement; line 1 pos 17", I have two
> questions,
> > 1. Do we have a list of the statement supported in spark-sql ?
> > 2. Does spark-sql shell support hiveql ? If yes, how to set?
> >
> > The example I tried:
> > CREATE TEMPORARY TABLE jsonTable
> > USING org.apache.spark.sql.json
> > OPTIONS (
> >   path "examples/src/main/resources/people.json"
> > )
> > SELECT * FROM jsonTable
> > The exception I got,
> > > CREATE TEMPORARY TABLE jsonTable
> >  > USING org.apache.spark.sql.json
> >  > OPTIONS (
> >  >   path "examples/src/main/resources/people.json"
> >  > )
> >  > SELECT * FROM jsonTable
> >  > ;
> > 15/03/28 17:38:34 INFO ParseDriver: Parsing command: CREATE
> TEMPORARY TABLE jsonTable
> > USING org.apache.spark.sql.json
> > OPTIONS (
> >   path "examples/src/main/resources/people.json"
> > )
> > SELECT * FROM jsonTable
> > NoViableAltException(241@[654:1: ddlStatement : (
> createDatabaseStatement | switchDatabaseStatement | dropDatabaseStatement 
> |
> createTableStatement | dropTableStatement | truncateTableStatement |
> alterStatement | descStatement | showStatement | metastoreCheck |
> createViewStatement | dropViewStatement | createFunctionStatement |
> createMacroStatement | createIndexStatement | dropIndexStatement |
> dropFunctionStatement | dropMacroStatement | analyzeStatement |
> lockStatement | unlockStatement | lockDatabase | unlockDatabase |
> createRoleStatement | dropRoleStatement | grantPrivileges |
> revokePrivileges | showGrants | showRoleGrants | showRolePrincipals |
> showRoles | grantRole | revokeRole | setRole | showCurrentRole );])
> > at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
> > at org.antlr.runtime.DFA.predict(DFA.java:144)
> > at
> org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:2090)
> > at
> org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1398)
> > at
> org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1036)
> > at
> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:199)
> > at
> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
> > at
> org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227)
> > at
> org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:241)
> > at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
> > at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
> > at
> scala.util.parsing.comb

Re: Actor not found

2015-03-31 Thread sparkdi
This is the whole output from the shell:

~/spark-1.3.0-bin-hadoop2.4$ sudo bin/spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/03/30 19:00:40 INFO SecurityManager: Changing view acls to: root
15/03/30 19:00:40 INFO SecurityManager: Changing modify acls to: root
15/03/30 19:00:40 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view pe  


rmissions: Set(root); users with modify permissions: Set(root)
15/03/30 19:00:40 INFO HttpServer: Starting HTTP Server
15/03/30 19:00:40 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/30 19:00:40 INFO AbstractConnector: Started
SocketConnector@0.0.0.0:47797
15/03/30 19:00:40 INFO Utils: Successfully started service 'HTTP class
server' on port 47797.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.0
  /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
15/03/30 19:00:42 INFO SparkContext: Running Spark version 1.3.0
15/03/30 19:00:42 INFO SecurityManager: Changing view acls to: root
15/03/30 19:00:42 INFO SecurityManager: Changing modify acls to: root
15/03/30 19:00:42 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view pe  


rmissions: Set(root); users with modify permissions: Set(root)
15/03/30 19:00:42 INFO Slf4jLogger: Slf4jLogger started
15/03/30 19:00:42 INFO Remoting: Starting remoting
15/03/30 19:00:43 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@vm:52574]
15/03/30 19:00:43 INFO Utils: Successfully started service 'sparkDriver' on
port 52574.
15/03/30 19:00:43 INFO SparkEnv: Registering MapOutputTracker
15/03/30 19:00:43 INFO SparkEnv: Registering BlockManagerMaster
15/03/30 19:00:43 INFO DiskBlockManager: Created local directory at
/tmp/spark-f71a8d86-6e49-4dfe-bb98-8e8581015acc/bl  


ockmgr-57532f5a-38db-4ba3-86d8-edef84f592e5
15/03/30 19:00:43 INFO MemoryStore: MemoryStore started with capacity 265.4
MB
15/03/30 19:00:43 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-95e0a143-0de3-4c96-861c-968c9fae2746/h   

   
ttpd-cb029cd6-4943-479d-9b56-e7397489d9ea
15/03/30 19:00:43 INFO HttpServer: Starting HTTP Server
15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/30 19:00:43 INFO AbstractConnector: Started
SocketConnector@0.0.0.0:48500
15/03/30 19:00:43 INFO Utils: Successfully started service 'HTTP file
server' on port 48500.
15/03/30 19:00:43 INFO SparkEnv: Registering OutputCommitCoordinator
15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/30 19:00:43 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/03/30 19:00:43 INFO Utils: Successfully started service 'SparkUI' on port
4040.
15/03/30 19:00:43 INFO SparkUI: Started SparkUI at http://vm:4040
15/03/30 19:00:43 INFO Executor: Starting executor ID  on host
localhost
15/03/30 19:00:43 INFO Executor: Using REPL class URI:
http://10.11.204.80:47797
15/03/30 19:00:43 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@vm:5 
 
2574/user/HeartbeatReceiver
15/03/30 19:00:43 ERROR OneForOneStrategy: Actor not found for:
ActorSelection[Anchor(akka://sparkDriver/deadLetters),  

 
Path(/)]
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask

pyspark error with zip

2015-03-31 Thread Charles Hayden
?

The following program fails in the zip step.

x = sc.parallelize([1, 2, 3, 1, 2, 3])
y = sc.parallelize([1, 2, 3])
z = x.distinct()
print x.zip(y).collect()


The error that is produced depends on whether multiple partitions have been 
specified or not.

I understand that

the two RDDs [must] have the same number of partitions and the same number of 
elements in each partition.

What is the best way to work around this restriction?

I have been performing the operation with the following code, but I am hoping 
to find something more efficient.

def safe_zip(left, right):
ix_left = left.zipWithIndex().map(lambda row: (row[1], row[0]))
ix_right = right.zipWithIndex().map(lambda row: (row[1], row[0]))
return ix_left.join(ix_right).sortByKey().values()




Re: Parquet Hive table become very slow on 1.3?

2015-03-31 Thread Zheng, Xudong
Thanks Cheng!

Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, but
the PR 5231 seems not. Not sure any other things I did wrong ...

BTW, actually, we are very interested in the schema merging feature in
Spark 1.3, so both these two solution will disable this feature, right? It
seems that Parquet metadata is store in a file named _metadata in the
Parquet file folder (each folder is a partition as we use partition table),
why we need scan all Parquet part files? Is there any other solutions could
keep schema merging feature at the same time? We are really like this
feature :)

On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian  wrote:

>  Hi Xudong,
>
> This is probably because of Parquet schema merging is turned on by
> default. This is generally useful for Parquet files with different but
> compatible schemas. But it needs to read metadata from all Parquet
> part-files. This can be problematic when reading Parquet files with lots of
> part-files, especially when the user doesn't need schema merging.
>
> This issue is tracked by SPARK-6575, and here is a PR for it:
> https://github.com/apache/spark/pull/5231. This PR adds a configuration
> to disable schema merging by default when doing Hive metastore Parquet
> table conversion.
>
> Another workaround is to fallback to the old Parquet code by setting
> spark.sql.parquet.useDataSourceApi to false.
>
> Cheng
>
>
> On 3/31/15 2:47 PM, Zheng, Xudong wrote:
>
> Hi all,
>
>  We are using Parquet Hive table, and we are upgrading to Spark 1.3. But
> we find that, just a simple COUNT(*) query will much slower (100x) than
> Spark 1.2.
>
>  I find the most time spent on driver to get HDFS blocks. I find large
> amount of get below logs printed:
>
>  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 
> 2097ms
> 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
>   fileLength=77153436
>   underConstruction=false
>   
> blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
>  getBlockSize()=77153436; corrupt=false; offset=0; 
> locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
>   
> lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
>  getBlockSize()=77153436; corrupt=false; offset=0; 
> locs=[10.152.116.169:50010, 10.153.125.184:50010, 10.152.116.172:50010]}
>   isLastBlockComplete=true}
> 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010
>
>
>  I compare the printed log with Spark 1.2, although the number of
> getBlockLocations call is similar, but each such operation only cost 20~30
> ms (but it is 2000ms~3000ms now), and it didn't print the detailed
> LocatedBlocks info.
>
>  Another finding is, if I read the Parquet file via scala code form
> spark-shell as below, it looks fine, the computation will return the result
> quick as before.
>
>  sqlContext.parquetFile("data/myparquettable")
>
>
>  Any idea about it? Thank you!
>
>
>  --
>   郑旭东
> Zheng, Xudong
>
>
>


-- 
郑旭东
Zheng, Xudong


Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
guoqiang ??s method works very well ??

it only takes 1TB disk now.

thank you very much!



> ?? 2015??3??314:47??GuoQiang Li  ??
> 
> You can try to enforce garbage collection:
> 
> /** Run GC and make sure it actually has run */
> def runGC() {
>   val weakRef = new WeakReference(new Object())
>   val startTime = System.currentTimeMillis
>   System.gc() // Make a best effort to run the garbage collection. It 
> *usually* runs GC.
>   // Wait until a weak reference object has been GCed
>   System.runFinalization()
>   while (weakRef.get != null) {
> System.gc()
> System.runFinalization()
> Thread.sleep(200)
> if (System.currentTimeMillis - startTime > 1) {
>   throw new Exception("automatically cleanup error")
> }
>   }
> }
> 
> 
> --  --
> ??: "lisendong"mailto:lisend...@163.com>>; 
> : 2015??3??31??(??) 3:47
> ??: "Xiangrui Meng"mailto:men...@gmail.com>>; 
> : "Xiangrui Meng"mailto:m...@databricks.com>>; 
> "user"mailto:user@spark.apache.org>>; "Sean 
> Owen"mailto:so...@cloudera.com>>; "GuoQiang 
> Li"mailto:wi...@qq.com>>; 
> : Re: different result from implicit ALS with explicit ALS
> 
> I have update my spark source code to 1.3.1.
> 
> the checkpoint works well. 
> 
> BUT the shuffle data still could not be delete automatically??the disk usage 
> is still 30TB??
> 
> I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
> 
> Do you know how to solve my problem?
> 
> Sendong Li
> 
> 
> 
>> ?? 2015??3??3112:11??Xiangrui Meng > > ??
>> 
>> setCheckpointInterval was added in the current master and branch-1.3. Please 
>> help check whether it works. It will be included in the 1.3.1 and 1.4.0 
>> release. -Xiangrui
>> 
>> On Mon, Mar 30, 2015 at 7:27 AM, lisendong > > wrote:
>> hi, xiangrui:
>> I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
>> the code is :
>> https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
>>  
>> 
>> 
>> 
>> the checkpoint is very important in my situation, because my task will 
>> produce 1TB shuffle data in each iteration, it the shuffle data is not 
>> deleted in each iteration(using checkpoint()), the task will produce 30TB 
>> data??
>> 
>> 
>> So I change the ALS code, and re-compile by myself, but it seems the 
>> checkpoint does not take effects, and the task still occupy 30TB disk?? ( I 
>> only add two lines to the ALS.scala) :
>> 
>> 
>> 
>> 
>> 
>> and the driver??s log seems strange, why the log is printed together...
>> 
>> 
>> thank you very much!
>> 
>> 
>>> ?? 2015??2??2611:33??163 >> > ??
>>> 
>>> Thank you very much for your opinion:)
>>> 
>>> In our case, maybe it 's dangerous to treat un-observed item as negative 
>>> interaction(although we could give them small confidence, I think they are 
>>> still incredible...)
>>> 
>>> I will do more experiments and give you feedback:)
>>> 
>>> Thank you;)
>>> 
>>> 
 ?? 2015??2??2623:16??Sean Owen >>> > ??
 
 I believe that's right, and is what I was getting at. yes the implicit
 formulation ends up implicitly including every possible interaction in
 its loss function, even unobserved ones. That could be the difference.
 
 This is mostly an academic question though. In practice, you have
 click-like data and should be using the implicit version for sure.
 
 However you can give negative implicit feedback to the model. You
 could consider no-click as a mild, observed, negative interaction.
 That is: supply a small negative value for these cases. Unobserved
 pairs are not part of the data set. I'd be careful about assuming the
 lack of an action carries signal.
 
> On Thu, Feb 26, 2015 at 3:07 PM, 163  > wrote:
> oh my god, I think I understood...
> In my case, there are three kinds of user-item pairs:
> 
> Display and click pair(positive pair)
> Display but no-click pair(negative pair)
> No-display pair(unobserved pair)
> 
> Explicit ALS only consider the first and the second kinds
> But implicit ALS consider all the three kinds of pair(and consider the 
> third
> kind as the second pair, because their preference value are all zero and
> confidence are all 1)
> 
> So the result are different. right?
> 
> Could you please give me some advice, which ALS should I use?
> If I use the implicit ALS, how to distinguish the second and the third 
> kind
> of pair:)
> 
> My opinion is in my case, I should use explicit ALS ...
> 
>>>

Hygienic closures for scala function serialization

2015-03-31 Thread Erik Erlandson

Under certain conditions, Scala will pull an entire class instance into a 
closure instead of just particular value members of that class, which can 
knee-cap Spark's serialization of functions in multiple ways.  I tripped over 
this last week, and wrote up the experience on my blog.  I've since noticed a 
couple other user descriptions of the same scenario, so maybe the post will be 
interesting to the community:

http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



"Ambiguous references" to a field set in a partitioned table AND the data

2015-03-31 Thread nfo
Hi,

I save Parquet files in a partitioned table, so in a path looking like
/path/to/table/myfield=a/ .
But I also kept the field "myfield" in the Parquet data. Thus. when I query
the field, I get this error:

df.select("myfield").show(10)
"Exception in thread "main" org.apache.spark.sql.AnalysisException:
Ambiguous references to myfield  (myfield#2,List()),(myfield#47,List());"

Looking at the code, I could not find a way to explicitly specify which
column I'd want. DataFrame#columns returns strings. Even by loading the data
with a schema (StructType), I'm not sure I can do it.

Should I have to make sure that my partition field does not exist in the
data before saving ? Or is there a way to declare what column in the schema
I want to query ?

Also, for the same reasons, if I try to persist() the data, I get this
error:

* Caused by: java.lang.ArrayIndexOutOfBoundsException: 3
at parquet.bytes.BytesUtils.bytesToInt(BytesUtils.java:227)
at
parquet.column.statistics.IntStatistics.setMinMaxFromBytes(IntStatistics.java:46)
at
parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249)
at
parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:558)
at
parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:492)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ambiguous-references-to-a-field-set-in-a-partitioned-table-AND-the-data-tp22325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: refer to dictionary

2015-03-31 Thread Peng Xia
Hi Ted,

Thanks very much, yea, using broadcast is much faster.

Best,
Peng

On Tue, Mar 31, 2015 at 8:49 AM, Ted Yu  wrote:

> You can use broadcast variable.
>
> See also this thread:
>
> http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variable&subj=How+Broadcast+variable+scale+
>
>
>
> > On Mar 31, 2015, at 4:43 AM, Peng Xia  wrote:
> >
> > Hi,
> >
> > I have a RDD (rdd1)where each line is split into an array ["a", "b",
> "c], etc.
> > And I also have a local dictionary p (dict1) stores key value pair
> {"a":1, "b": 2, c:3}
> > I want to replace the keys in the rdd with the its corresponding value
> in the dict:
> > rdd1.map(lambda line: [dict1[item] for item in line])
> >
> > But this task is not distributed, I believe the reason is the dict1 is a
> local instance.
> > Can any one provide suggestions on this to parallelize this?
> >
> >
> > Thanks,
> > Best,
> > Peng
> >
>


How to setup a Spark Cluter?

2015-03-31 Thread bhushansc007
Hi All,

I am quite new to Spark. So please pardon me if it is a very basic question. 

I have setup a Hadoop cluster using Hortonwork's Ambari. It has 1 Master and
3 Worker nodes. Currently, it has HDFS, Yarn, MapReduce2, HBase and
ZooKeeper services installed. 

Now, I want to install Spark on it. How do I do that? I searched a lot
online, but there is no clear step-by-step installation guide to do that.
All I find is the standalone setup guides. Can someone provide steps? What
needs to be copied to each machine? Where and what config changes should be
made on each machine?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-setup-a-Spark-Cluter-tp22326.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Anny Chen
Hi Akhil,

I tried editing the /etc/hosts on the master and on the workers, and seems
it is not working for me.

I tried adding   and it didn't work. I then tried
adding   and it didn't work either. I guess I should
also edit the spark-env.sh file?

Thanks!
Anny

On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das 
wrote:

> You can add an internal ip to public hostname mapping in your /etc/hosts
> file, if your forwarding is proper then it wouldn't be a problem there
> after.
>
>
>
> Thanks
> Best Regards
>
> On Tue, Mar 31, 2015 at 9:18 AM, anny9699  wrote:
>
>> Hi,
>>
>> For security reasons, we added a server between my aws Spark Cluster and
>> local, so I couldn't connect to the cluster directly. To see the SparkUI
>> and
>> its related work's  stdout and stderr, I used dynamic forwarding and
>> configured the SOCKS proxy. Now I could see the SparkUI using the
>> internal
>> ec2 ip, however when I click on the application UI (4040) or the worker's
>> UI
>> (8081), it still automatically uses the public DNS instead of internal ec2
>> ip, which the browser now couldn't show.
>>
>> Is there a way that I could configure this? I saw that one could configure
>> the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could
>> help. Does anyone experience the same issue?
>>
>> Thanks a lot!
>> Anny
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Petar Zecevic


Did you try setting the SPARK_MASTER_IP parameter in spark-env.sh?


On 31.3.2015. 19:19, Anny Chen wrote:

Hi Akhil,

I tried editing the /etc/hosts on the master and on the workers, and 
seems it is not working for me.


I tried adding   and it didn't work. I then 
tried adding   and it didn't work either. I 
guess I should also edit the spark-env.sh file?


Thanks!
Anny

On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das 
mailto:ak...@sigmoidanalytics.com>> wrote:


You can add an internal ip to public hostname mapping in your
/etc/hosts file, if your forwarding is proper then it wouldn't be
a problem there after.



Thanks
Best Regards

On Tue, Mar 31, 2015 at 9:18 AM, anny9699 mailto:anny9...@gmail.com>> wrote:

Hi,

For security reasons, we added a server between my aws Spark
Cluster and
local, so I couldn't connect to the cluster directly. To see
the SparkUI and
its related work's  stdout and stderr, I used dynamic
forwarding and
configured the SOCKS proxy. Now I could see the SparkUI using
the  internal
ec2 ip, however when I click on the application UI (4040) or
the worker's UI
(8081), it still automatically uses the public DNS instead of
internal ec2
ip, which the browser now couldn't show.

Is there a way that I could configure this? I saw that one
could configure
the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether
this could
help. Does anyone experience the same issue?

Thanks a lot!
Anny




--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org








Re: How to setup a Spark Cluter?

2015-03-31 Thread Akhil Das
Its pretty simple, pick one machine as master (say machine A), and lets
call the workers are B,C, and D

*Login to A:*

- Enable passwd less authentication (ssh-keygen)
   - Add A's ~/.ssh/id_rsa.pub to B,C,D's ~/.ssh/authorized_keys file

- Download spark binary (that supports your hadoop version) from
https://spark.apache.org/downloads.html (eg: wget
http://d3kbcqa49mib13.cloudfront.net/spark-1.3.0-bin-hadoop2.4.tgz)
- Extract it (tar xf spark*tgz)
- cd spark-1.3.0-bin-hadoop2.4;cp conf/spark-env.sh.template
conf/spark-env.sh
- vi conf/spark-env.sh : Now configure SPARK_MASTER_IP, SPARK_WORKER_CORES,
SPARK_WORKER_MEMORY as the resources you have.
- vi conf/slaves : Add B,C,D hostnames/ipaddress line by line

- cd ../;
- rsync -za spark-1.3.0-bin-hadoop2.4 B:
- rsync -za spark-1.3.0-bin-hadoop2.4 C:
- rsync -za spark-1.3.0-bin-hadoop2.4 D:
- cd spark-1.3.0-bin-hadoop2.4;sbin/start-all.sh

Now your cluster is up and running, just be careful with your firewall
entries. If you open up all ports then anyone can take over your cluster.
:) Read more : https://www.sigmoid.com/securing-apache-spark-cluster/





Thanks
Best Regards

On Tue, Mar 31, 2015 at 10:26 PM, bhushansc007 
wrote:

> Hi All,
>
> I am quite new to Spark. So please pardon me if it is a very basic
> question.
>
> I have setup a Hadoop cluster using Hortonwork's Ambari. It has 1 Master
> and
> 3 Worker nodes. Currently, it has HDFS, Yarn, MapReduce2, HBase and
> ZooKeeper services installed.
>
> Now, I want to install Spark on it. How do I do that? I searched a lot
> online, but there is no clear step-by-step installation guide to do that.
> All I find is the standalone setup guides. Can someone provide steps? What
> needs to be copied to each machine? Where and what config changes should be
> made on each machine?
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-setup-a-Spark-Cluter-tp22326.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Akhil Das
When you say you added  , where you able to ping any
of these from the machine?

You could try setting SPARK_LOCAL_IP on all machines. But make sure you
will be able to bind to that host/ip specified there.


Thanks
Best Regards

On Tue, Mar 31, 2015 at 10:49 PM, Anny Chen  wrote:

> Hi Akhil,
>
> I tried editing the /etc/hosts on the master and on the workers, and seems
> it is not working for me.
>
> I tried adding   and it didn't work. I then tried
> adding   and it didn't work either. I guess I should
> also edit the spark-env.sh file?
>
> Thanks!
> Anny
>
> On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das 
> wrote:
>
>> You can add an internal ip to public hostname mapping in your /etc/hosts
>> file, if your forwarding is proper then it wouldn't be a problem there
>> after.
>>
>>
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Mar 31, 2015 at 9:18 AM, anny9699  wrote:
>>
>>> Hi,
>>>
>>> For security reasons, we added a server between my aws Spark Cluster and
>>> local, so I couldn't connect to the cluster directly. To see the SparkUI
>>> and
>>> its related work's  stdout and stderr, I used dynamic forwarding and
>>> configured the SOCKS proxy. Now I could see the SparkUI using the
>>> internal
>>> ec2 ip, however when I click on the application UI (4040) or the
>>> worker's UI
>>> (8081), it still automatically uses the public DNS instead of internal
>>> ec2
>>> ip, which the browser now couldn't show.
>>>
>>> Is there a way that I could configure this? I saw that one could
>>> configure
>>> the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could
>>> help. Does anyone experience the same issue?
>>>
>>> Thanks a lot!
>>> Anny
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Anny Chen
Hi Akhil,

Thanks for the explanation! I could ping the worker from the master using
either host name or internal-ip, however I am a little confused why setting
SPARK_LOCAL_IP would help?

Thanks!
Anny

On Tue, Mar 31, 2015 at 10:36 AM, Akhil Das 
wrote:

> When you say you added  , where you able to ping
> any of these from the machine?
>
> You could try setting SPARK_LOCAL_IP on all machines. But make sure you
> will be able to bind to that host/ip specified there.
>
>
> Thanks
> Best Regards
>
> On Tue, Mar 31, 2015 at 10:49 PM, Anny Chen  wrote:
>
>> Hi Akhil,
>>
>> I tried editing the /etc/hosts on the master and on the workers, and
>> seems it is not working for me.
>>
>> I tried adding   and it didn't work. I then tried
>> adding   and it didn't work either. I guess I should
>> also edit the spark-env.sh file?
>>
>> Thanks!
>> Anny
>>
>> On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das 
>> wrote:
>>
>>> You can add an internal ip to public hostname mapping in your /etc/hosts
>>> file, if your forwarding is proper then it wouldn't be a problem there
>>> after.
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Mar 31, 2015 at 9:18 AM, anny9699  wrote:
>>>
 Hi,

 For security reasons, we added a server between my aws Spark Cluster and
 local, so I couldn't connect to the cluster directly. To see the
 SparkUI and
 its related work's  stdout and stderr, I used dynamic forwarding and
 configured the SOCKS proxy. Now I could see the SparkUI using the
 internal
 ec2 ip, however when I click on the application UI (4040) or the
 worker's UI
 (8081), it still automatically uses the public DNS instead of internal
 ec2
 ip, which the browser now couldn't show.

 Is there a way that I could configure this? I saw that one could
 configure
 the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this
 could
 help. Does anyone experience the same issue?

 Thanks a lot!
 Anny




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: Broadcasting a parquet file using spark and python

2015-03-31 Thread Michael Armbrust
In Spark 1.3 I would expect this to happen automatically when the parquet
table is small (< 10mb, configurable with
spark.sql.autoBroadcastJoinThreshold).
If you are running 1.3 and not seeing this, can you show the code you are
using to create the table?

On Tue, Mar 31, 2015 at 3:25 AM, jitesh129  wrote:

> How can we implement a BroadcastHashJoin for spark with python?
>
> My SparkSQL inner joins are taking a lot of time since it is performing
> ShuffledHashJoin.
>
> Tables on which join is performed are stored as parquet files.
>
> Please help.
>
> Thanks and regards,
> Jitesh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array

2015-03-31 Thread Todd Nist
I am accessing ElasticSearch via the elasticsearch-hadoop and attempting to
expose it via SparkSQL. I am using spark 1.2.1, latest supported by
elasticsearch-hadoop, and "org.elasticsearch" % "elasticsearch-hadoop" %
"2.1.0.BUILD-SNAPSHOT" of elasticsearch-hadoop. I’m
encountering an issue when I attempt to query the following json after
creating a temporary table from it. The json looks like this:

PUT /_template/device
{
  "template": "dev*",
  "settings": {
"number_of_shards": 1
  },
  "mappings": {
"metric": {
  "_timestamp" : {
"enabled" : true,
"stored" : true,
"path" : "timestamp",
"format" : "-MM-dd'T'HH:mm:ssZZ"
  },
  "properties": {
"pathId": {
  "type": "string"
},
"pathElements": {
  "properties": {
"node": {
  "type": "string"
},
"value": {
  "type": "string"
}
  }
},
"name": {
  "type": "string"
},
"value": {
  "type": "double"
},
"timestamp": {
  "type": "date",
  "store": true
}
  }
}
  }
}

Querying all columns work fine except for the pathElements which is a json
array. If this is added to the select it fails with
ajava.util.NoSuchElementException:
key not found: node.

*Details*.

The program is pretty basic, looks like this:

/**
 * A simple sample to read and write to ES using elasticsearch-hadoop.
 */

package com.opsdatastore.elasticsearch.spark

import java.io.File


// Scala imports
import scala.collection.JavaConversions._
// Spark imports
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}

// ES imports
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

// OpsDataStore
import com.opsdatastore.spark.utils.{Settings, Spark, ElasticSearch}

object ElasticSearchReadWrite {

  /**
   * Spark specific configuration
   */
  def sparkInit(): SparkContext = {
val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
conf.set("es.nodes", ElasticSearch.Nodes)
conf.set("es.port", ElasticSearch.HttpPort.toString())
conf.set("es.index.auto.create", "true");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.executor.memory","1g")
conf.set("spark.kryoserializer.buffer.mb","256")

val sparkContext = new SparkContext(conf)
sparkContext.addJar(Spark.JarPath + jar))
sparkContext
  }


  def main(args: Array[String]) {

val sc = sparkInit

val sqlContext = new SQLContext(sc)
import sqlContext._

val start = System.currentTimeMillis()

// specific query, just read all for now
sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}", "?q=*:*")

/*
 * Read from ES and provide some insight with Spark & SparkSQL
 */
val esData = sc.esRDD("device/metric")

esData.collect.foreach(println(_))

val end = System.currentTimeMillis()
println(s"Total time: ${end-start} ms")

println("Create Metric Temporary Table for querying")
val schemaRDD = sqlContext.sql(
  "CREATE TEMPORARY TABLE metric " +
  "USING org.elasticsearch.spark.sql " +
  "OPTIONS (resource 'device/metric')  " )

System.out.println("")
System.out.println("#  Scheam Definition   #")
System.out.println("")
schemaRDD.printSchema()

System.out.println("")
System.out.println("#  Data from SparkSQL  #")
System.out.println("")

sqlContext.sql("SELECT path, pathElements, `timestamp`, name,
value FROM metric").collect.foreach(println(_))
  }
}

So this works fine:

sc.esRDD(*"*device/metric")
esData.collect.foreach(println(_))

And results in this:

15/03/31 14:37:48 INFO DAGScheduler: Job 0 finished: collect at
ElasticSearchReadWrite.scala:67, took 4.948556 s
(AUxxDrs4cgadF5SlaMg0,Map(pathElements -> Buffer(Map(node -> State,
value -> PA), Map(node -> City, value -> Pittsburgh), Map(node ->
Street, value -> 12345 Westbrook Drive), Map(node -> level, value ->
main), Map(node -> device, value -> thermostat)), value ->
29.590943279257175, name -> Current Temperature, timestamp ->
2015-03-27T14:53:46+, path -> /PA/Pittsburgh/12345 Westbrook
Drive/main/theromostat-1))

Yet this fails:

sqlContext.sql("SELECT path, pathElements, `timestamp`, name, value
FROM metric").collect.foreach(println(_))

With this exception:

Create Metric Temporary Table for
querying#  Scheam
Definition   #
root
#  Data from SparkSQL
 

Re: why "Shuffle Write" is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Udit Mehta
I have noticed a similar issue when using spark streaming. The spark
shuffle write size increases to a large size(in GB) and then the app
crashes saying:
java.io.FileNotFoundException:
/data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
(No such file or directory)

I dont understand why the shuffle size increases to such a large value for
long running jobs.

Thanks,
Udiy

On Mon, Mar 30, 2015 at 4:01 AM, shahab  wrote:

> Thanks Saisai. I will try your solution, but still i don't understand why
> filesystem should be used where there is a plenty of memory available!
>
>
>
> On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao 
> wrote:
>
>> Shuffle write will finally spill the data into file system as a bunch of
>> files. If you want to avoid disk write, you can mount a ramdisk and
>> configure "spark.local.dir" to this ram disk. So shuffle output will write
>> to memory based FS, and will not introduce disk IO.
>>
>> Thanks
>> Jerry
>>
>> 2015-03-30 17:15 GMT+08:00 shahab :
>>
>>> Hi,
>>>
>>> I was looking at SparkUI, Executors, and I noticed that I have 597 MB
>>> for  "Shuffle while I am using cached temp-table and the Spark had 2 GB
>>> free memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!
>>>
>>> Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks be
>>> done in memory?
>>>
>>> best,
>>>
>>> /Shahab
>>>
>>
>>
>


java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

2015-03-31 Thread Jeetendra Gangele
When I am trying to get the result from Hbase and running mapToPair
function of RRD its giving the error
java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

Here is the code

// private static JavaPairRDD
getCompanyDataRDD(JavaSparkContext sc) throws IOException {
// return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(),
TableInputFormat.class, ImmutableBytesWritable.class,
//Result.class).mapToPair(new
PairFunction, Integer, Result>() {
//
// public Tuple2 call(Tuple2 t) throws Exception {
// System.out.println("In getCompanyDataRDD"+t._2);
//
// String cknid = Bytes.toString(t._1.get());
// System.out.println("processing cknids is:"+cknid);
// Integer cknidInt = Integer.parseInt(cknid);
// Tuple2 returnTuple = new Tuple2(cknidInt, t._2);
// return returnTuple;
// }
// });
// }


Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

2015-03-31 Thread Sean Owen
Yep, it's not serializable:
https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html

You can't return this from a distributed operation since that would
mean it has to travel over the network and you haven't supplied any
way to convert the thing into bytes.

On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele  wrote:
> When I am trying to get the result from Hbase and running mapToPair function
> of RRD its giving the error
> java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
>
> Here is the code
>
> // private static JavaPairRDD
> getCompanyDataRDD(JavaSparkContext sc) throws IOException {
> // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(),
> TableInputFormat.class, ImmutableBytesWritable.class,
> //Result.class).mapToPair(new
> PairFunction, Integer, Result>() {
> //
> // public Tuple2 call(Tuple2 Result> t) throws Exception {
> // System.out.println("In getCompanyDataRDD"+t._2);
> //
> // String cknid = Bytes.toString(t._1.get());
> // System.out.println("processing cknids is:"+cknid);
> // Integer cknidInt = Integer.parseInt(cknid);
> // Tuple2 returnTuple = new Tuple2 Result>(cknidInt, t._2);
> // return returnTuple;
> // }
> // });
> // }

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

2015-03-31 Thread Nan Zhu
The example in 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
 might help

Best, 

-- 
Nan Zhu
http://codingcat.me


On Tuesday, March 31, 2015 at 3:56 PM, Sean Owen wrote:

> Yep, it's not serializable:
> https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html
> 
> You can't return this from a distributed operation since that would
> mean it has to travel over the network and you haven't supplied any
> way to convert the thing into bytes.
> 
> On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele  (mailto:gangele...@gmail.com)> wrote:
> > When I am trying to get the result from Hbase and running mapToPair function
> > of RRD its giving the error
> > java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
> > 
> > Here is the code
> > 
> > // private static JavaPairRDD
> > getCompanyDataRDD(JavaSparkContext sc) throws IOException {
> > // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(),
> > TableInputFormat.class, ImmutableBytesWritable.class,
> > // Result.class).mapToPair(new
> > PairFunction, Integer, Result>() {
> > //
> > // public Tuple2 call(Tuple2 > Result> t) throws Exception {
> > // System.out.println("In getCompanyDataRDD"+t._2);
> > //
> > // String cknid = Bytes.toString(t._1.get());
> > // System.out.println("processing cknids is:"+cknid);
> > // Integer cknidInt = Integer.parseInt(cknid);
> > // Tuple2 returnTuple = new Tuple2 > Result>(cknidInt, t._2);
> > // return returnTuple;
> > // }
> > // });
> > // }
> > 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> (mailto:user-unsubscr...@spark.apache.org)
> For additional commands, e-mail: user-h...@spark.apache.org 
> (mailto:user-h...@spark.apache.org)
> 
> 




Re: "Spark-events does not exist" error, while it does with all the req. rights

2015-03-31 Thread Marcelo Vanzin
Hmmm... could you try to set the log dir to
"file:/home/hduser/spark/spark-events"?

I checked the code and it might be the case that the behaviour changed
between 1.2 and 1.3...

On Mon, Mar 30, 2015 at 6:44 PM, Tom Hubregtsen  wrote:
> The stack trace for the first scenario and your suggested improvement is
> similar, with as only difference the first line (Sorry for not including
> this):
> "Log directory /home/hduser/spark/spark-events does not exist."
>
> To verify your premises, I cd'ed into the directory by copy pasting the path
> listed in the error message (i, ii), created a text file, closed it an
> viewed it, and deleted it (iii). My findings were reconfirmed by my
> colleague. Any other ideas?
>
> Thanks,
>
> Tom
>
>
> On 30 March 2015 at 19:19, Marcelo Vanzin  wrote:
>>
>> So, the error below is still showing the invalid configuration.
>>
>> You mentioned in the other e-mails that you also changed the
>> configuration, and that the directory really, really exists. Given the
>> exception below, the only ways you'd get the error with a valid
>> configuration would be if (i) the directory didn't exist, (ii) it
>> existed but the user could not navigate to it or (iii) it existed but
>> was not actually a directory.
>>
>> So please double-check all that.
>>
>> On Mon, Mar 30, 2015 at 5:11 PM, Tom Hubregtsen 
>> wrote:
>> > Stack trace:
>> > 15/03/30 17:37:30 INFO storage.BlockManagerMaster: Registered
>> > BlockManager
>> > Exception in thread "main" java.lang.IllegalArgumentException: Log
>> > directory
>> > ~/spark/spark-events does not exist.
>>
>>
>> --
>> Marcelo
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why "Shuffle Write" is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Bijay Pathak
The Spark Sort-Based Shuffle (default from 1.1) keeps the data from
each Map tasks to memory until they they can't fit after which they
are sorted and spilled to disk. You can reduce the Shuffle write to
disk by increasing spark.shuffle.memoryFraction(default 0.2).

By writing the shuffle output to disk the Spark lineage can be
truncated when the RDDs are already materialized as the side-effects
of earlier shuffle.This is the under the hood optimization in Spark
which is only possible because of shuffle output output being written
to disk.

You can set spark.shuffle.spill to false if you don't want to spill to
the disk and assuming you have enough heap memory.

On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta  wrote:
> I have noticed a similar issue when using spark streaming. The spark shuffle
> write size increases to a large size(in GB) and then the app crashes saying:
> java.io.FileNotFoundException:
> /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
> (No such file or directory)
>
> I dont understand why the shuffle size increases to such a large value for
> long running jobs.
>
> Thanks,
> Udiy
>
> On Mon, Mar 30, 2015 at 4:01 AM, shahab  wrote:
>>
>> Thanks Saisai. I will try your solution, but still i don't understand why
>> filesystem should be used where there is a plenty of memory available!
>>
>>
>>
>> On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao 
>> wrote:
>>>
>>> Shuffle write will finally spill the data into file system as a bunch of
>>> files. If you want to avoid disk write, you can mount a ramdisk and
>>> configure "spark.local.dir" to this ram disk. So shuffle output will write
>>> to memory based FS, and will not introduce disk IO.
>>>
>>> Thanks
>>> Jerry
>>>
>>> 2015-03-30 17:15 GMT+08:00 shahab :

 Hi,

 I was looking at SparkUI, Executors, and I noticed that I have 597 MB
 for  "Shuffle while I am using cached temp-table and the Spark had 2 GB 
 free
 memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!

 Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks be
 done in memory?

 best,

 /Shahab
>>>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using 'fair' scheduler mode

2015-03-31 Thread asadrao
Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the
first query is a very expensive query (ex: ‘select *’ on a really big data
set) than any subsequent query seem to get blocked. I would have expected
the second query to run in parallel since I am using the ‘fair’ scheduler
mode not the ‘fifo’. I am submitting the query through thrift server.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--driver-memory parameter doesn't work for spark-submmit on yarn?

2015-03-31 Thread Shuai Zheng
Hi All,

 

Below is the my shell script:

 

/home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G
--master yarn-client --class com.***.FinancialEngineExecutor
/home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties 

 

My driver will load some resources and then broadcast to all executors.

 

That resources is only 600MB in ser format, but I always has out of memory
exception, it looks like the driver doesn't allocate right memory to my
driver.

 

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space

at java.lang.reflect.Array.newArray(Native Method)

at java.lang.reflect.Array.newInstance(Array.java:70)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)

at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

at
com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68)

 

Do I do anything wrong here? 

 

And no matter how much I set for --driver-memory value (from 512M to 20G),
it always give me error on the same line (that line try to load a 600MB java
serialization file). So looks like the script doesn't allocate right memory
to driver in my case?

 

Regards,

 

Shuai



Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

2015-03-31 Thread Ted Yu
Jeetendra:
Please extract the information you need from Result and return the
extracted portion - instead of returning Result itself.

Cheers

On Tue, Mar 31, 2015 at 1:14 PM, Nan Zhu  wrote:

> The example in
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
>  might
> help
>
> Best,
>
> --
> Nan Zhu
> http://codingcat.me
>
> On Tuesday, March 31, 2015 at 3:56 PM, Sean Owen wrote:
>
> Yep, it's not serializable:
> https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html
>
> You can't return this from a distributed operation since that would
> mean it has to travel over the network and you haven't supplied any
> way to convert the thing into bytes.
>
> On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele 
> wrote:
>
> When I am trying to get the result from Hbase and running mapToPair
> function
> of RRD its giving the error
> java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
>
> Here is the code
>
> // private static JavaPairRDD
> getCompanyDataRDD(JavaSparkContext sc) throws IOException {
> // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(),
> TableInputFormat.class, ImmutableBytesWritable.class,
> // Result.class).mapToPair(new
> PairFunction, Integer, Result>() {
> //
> // public Tuple2 call(Tuple2 Result> t) throws Exception {
> // System.out.println("In getCompanyDataRDD"+t._2);
> //
> // String cknid = Bytes.toString(t._1.get());
> // System.out.println("processing cknids is:"+cknid);
> // Integer cknidInt = Integer.parseInt(cknid);
> // Tuple2 returnTuple = new Tuple2 Result>(cknidInt, t._2);
> // return returnTuple;
> // }
> // });
> // }
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Query REST web service with Spark?

2015-03-31 Thread Minnow Noir
We have have some data on Hadoop that needs augmented with data only
available to us via a REST service.  We're using Spark to search for, and
correct, missing data. Even though there are a lot of records to scour for
missing data, the total number of calls to the service is expected to be
low, so it would be ideal to do the whole job in Spark as we scour the data.

I don't see anything obvious in the API or on Google relating to making
REST calls from a Spark job.  Is it possible?

Thanks,

Alec


Re: why "Shuffle Write" is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Udit Mehta
Thanks for the reply.
This will reduce the shuffle write to disk to an extent but for a long
running job(multiple days), the shuffle write would still occupy a lot of
space on disk. Why do we need to store the data from older map tasks to
memory?

On Tue, Mar 31, 2015 at 1:19 PM, Bijay Pathak 
wrote:

> The Spark Sort-Based Shuffle (default from 1.1) keeps the data from
> each Map tasks to memory until they they can't fit after which they
> are sorted and spilled to disk. You can reduce the Shuffle write to
> disk by increasing spark.shuffle.memoryFraction(default 0.2).
>
> By writing the shuffle output to disk the Spark lineage can be
> truncated when the RDDs are already materialized as the side-effects
> of earlier shuffle.This is the under the hood optimization in Spark
> which is only possible because of shuffle output output being written
> to disk.
>
> You can set spark.shuffle.spill to false if you don't want to spill to
> the disk and assuming you have enough heap memory.
>
> On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta  wrote:
> > I have noticed a similar issue when using spark streaming. The spark
> shuffle
> > write size increases to a large size(in GB) and then the app crashes
> saying:
> > java.io.FileNotFoundException:
> >
> /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
> > (No such file or directory)
> >
> > I dont understand why the shuffle size increases to such a large value
> for
> > long running jobs.
> >
> > Thanks,
> > Udiy
> >
> > On Mon, Mar 30, 2015 at 4:01 AM, shahab  wrote:
> >>
> >> Thanks Saisai. I will try your solution, but still i don't understand
> why
> >> filesystem should be used where there is a plenty of memory available!
> >>
> >>
> >>
> >> On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao 
> >> wrote:
> >>>
> >>> Shuffle write will finally spill the data into file system as a bunch
> of
> >>> files. If you want to avoid disk write, you can mount a ramdisk and
> >>> configure "spark.local.dir" to this ram disk. So shuffle output will
> write
> >>> to memory based FS, and will not introduce disk IO.
> >>>
> >>> Thanks
> >>> Jerry
> >>>
> >>> 2015-03-30 17:15 GMT+08:00 shahab :
> 
>  Hi,
> 
>  I was looking at SparkUI, Executors, and I noticed that I have 597 MB
>  for  "Shuffle while I am using cached temp-table and the Spark had 2
> GB free
>  memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!
> 
>  Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks
> be
>  done in memory?
> 
>  best,
> 
>  /Shahab
> >>>
> >>>
> >>
> >
>


Did anybody run Spark-perf on powerpc?

2015-03-31 Thread Tom
We verified it runs on x86, and are now trying to run it on powerPC. We
currently run into dependency trouble with sbt. I tried installing sbt by
hand and resolving all dependencies by hand, but must have made an error, as
I still get errors.

Original error:
Getting org.scala-sbt sbt 0.13.6 ...

:: problems summary ::
 WARNINGS
module not found: org.scala-sbt#sbt;0.13.6

 local: tried

  /home/th/.ivy2/local/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml

  -- artifact org.scala-sbt#sbt;0.13.6!sbt.jar:

  /home/th/.ivy2/local/org.scala-sbt/sbt/0.13.6/jars/sbt.jar

 typesafe-ivy-releases: tried

 
https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml

 Maven Central: tried

  https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.pom

  -- artifact org.scala-sbt#sbt;0.13.6!sbt.jar:

  https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.jar

::

::  UNRESOLVED DEPENDENCIES ::

::

:: org.scala-sbt#sbt;0.13.6: not found

::


 ERRORS
Server access Error: Received fatal alert: decrypt_error
url=https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml

Server access Error: com.ibm.jsse2.util.h: PKIX path validation failed:
java.security.cert.CertPathValidatorException: The certificate issued by
CN=DigiCert Global Root CA, OU=www.digicert.com, O=DigiCert Inc, C=US is not
trusted; internal cause is: 
java.security.cert.CertPathValidatorException: Certificate chaining 
error
url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.pom

Server access Error: com.ibm.jsse2.util.h: PKIX path validation failed:
java.security.cert.CertPathValidatorException: The certificate issued by
CN=DigiCert Global Root CA, OU=www.digicert.com, O=DigiCert Inc, C=US is not
trusted; internal cause is: 
java.security.cert.CertPathValidatorException: Certificate chaining 
error
url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.jar




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Did-anybody-run-Spark-perf-on-powerpc-tp22329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Query REST web service with Spark?

2015-03-31 Thread Burak Yavuz
Hi,

If I recall correctly, I've read people integrating REST calls to Spark
Streaming jobs in the user list. I don't imagine any cases for why it
shouldn't be possible.

Best,
Burak

On Tue, Mar 31, 2015 at 1:46 PM, Minnow Noir  wrote:

> We have have some data on Hadoop that needs augmented with data only
> available to us via a REST service.  We're using Spark to search for, and
> correct, missing data. Even though there are a lot of records to scour for
> missing data, the total number of calls to the service is expected to be
> low, so it would be ideal to do the whole job in Spark as we scour the data.
>
> I don't see anything obvious in the API or on Google relating to making
> REST calls from a Spark job.  Is it possible?
>
> Thanks,
>
> Alec
>


Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread Xiangrui Meng
Hey Guoqiang and Sendong,

Could you comment on the overhead of calling gc() explicitly? The shuffle
files should get cleaned in a few seconds after checkpointing, but it is
certainly possible to accumulates TBs of files in a few seconds. In this
case, calling gc() may work the same as waiting for a few seconds after
each checkpoint. Is it correct?

Best,
Xiangrui

On Tue, Mar 31, 2015 at 8:58 AM, lisendong  wrote:

> guoqiang ’s method works very well …
>
> it only takes 1TB disk now.
>
> thank you very much!
>
>
>
> 在 2015年3月31日,下午4:47,GuoQiang Li  写道:
>
> You can try to enforce garbage collection:
>
> /** Run GC and make sure it actually has run */
> def runGC() {
>   val weakRef = new WeakReference(new Object())
>   val startTime = System.currentTimeMillis
>   System.gc() // Make a best effort to run the garbage collection. It
> *usually* runs GC.
>   // Wait until a weak reference object has been GCed
>   System.runFinalization()
>   while (weakRef.get != null) {
> System.gc()
> System.runFinalization()
> Thread.sleep(200)
> if (System.currentTimeMillis - startTime > 1) {
>   throw new Exception("automatically cleanup error")
> }
>   }
> }
>
>
> -- 原始邮件 --
> *发件人:* "lisendong";
> *发送时间:* 2015年3月31日(星期二) 下午3:47
> *收件人:* "Xiangrui Meng";
> *抄送:* "Xiangrui Meng"; "user";
> "Sean Owen"; "GuoQiang Li";
> *主题:* Re: different result from implicit ALS with explicit ALS
>
> I have update my spark source code to 1.3.1.
>
> the checkpoint works well.
>
> BUT the shuffle data still could not be delete automatically…the disk
> usage is still 30TB…
>
> I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
>
> Do you know how to solve my problem?
>
> Sendong Li
>
>
>
> 在 2015年3月31日,上午12:11,Xiangrui Meng  写道:
>
> setCheckpointInterval was added in the current master and branch-1.3.
> Please help check whether it works. It will be included in the 1.3.1 and
> 1.4.0 release. -Xiangrui
>
> On Mon, Mar 30, 2015 at 7:27 AM, lisendong  wrote:
>
>> hi, xiangrui:
>> I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
>> the code is :
>>
>> https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
>> 
>>
>> the checkpoint is very important in my situation, because my task will
>> produce 1TB shuffle data in each iteration, it the shuffle data is not
>> deleted in each iteration(using checkpoint()), the task will produce 30TB
>> data…
>>
>>
>> So I change the ALS code, and re-compile by myself, but it seems the
>> checkpoint does not take effects, and the task still occupy 30TB disk… ( I
>> only add two lines to the ALS.scala) :
>>
>> 
>>
>>
>>
>> and the driver’s log seems strange, why the log is printed together...
>> 
>>
>> thank you very much!
>>
>>
>> 在 2015年2月26日,下午11:33,163  写道:
>>
>> Thank you very much for your opinion:)
>>
>> In our case, maybe it 's dangerous to treat un-observed item as negative
>> interaction(although we could give them small confidence, I think they are
>> still incredible...)
>>
>> I will do more experiments and give you feedback:)
>>
>> Thank you;)
>>
>>
>> 在 2015年2月26日,23:16,Sean Owen  写道:
>>
>> I believe that's right, and is what I was getting at. yes the implicit
>> formulation ends up implicitly including every possible interaction in
>> its loss function, even unobserved ones. That could be the difference.
>>
>> This is mostly an academic question though. In practice, you have
>> click-like data and should be using the implicit version for sure.
>>
>> However you can give negative implicit feedback to the model. You
>> could consider no-click as a mild, observed, negative interaction.
>> That is: supply a small negative value for these cases. Unobserved
>> pairs are not part of the data set. I'd be careful about assuming the
>> lack of an action carries signal.
>>
>> On Thu, Feb 26, 2015 at 3:07 PM, 163  wrote:
>> oh my god, I think I understood...
>> In my case, there are three kinds of user-item pairs:
>>
>> Display and click pair(positive pair)
>> Display but no-click pair(negative pair)
>> No-display pair(unobserved pair)
>>
>> Explicit ALS only consider the first and the second kinds
>> But implicit ALS consider all the three kinds of pair(and consider the
>> third
>> kind as the second pair, because their preference value are all zero and
>> confidence are all 1)
>>
>> So the result are different. right?
>>
>> Could you please give me some advice, which ALS should I use?
>> If I use the implicit ALS, how to distinguish the second and the third
>> kind
>> of pair:)
>>
>> My opinion is in my case, I should use explicit ALS ...
>>
>> Thank you so much
>>
>> 在 2015年2月26日,22:41,Xiangrui Meng  写道:
>>
>> Lisen, did you use all m-by-n pairs during training? Implicit model
>> penalizes unobserved ratings, while explicit model doesn't. -Xiangrui
>>
>> On Feb 26, 2015 6:26 AM, "Sean Owen"  wrote:
>>
>> +user
>>

joining multiple parquet files

2015-03-31 Thread roni
Hi ,
 I have 4 parquet files and I want to find data which is common in all of
them
e.g

SELECT TableA.*, TableB.*, TableC.*, TableD.* FROM (TableB INNER JOIN TableA
ON TableB.aID= TableA.aID)
INNER JOIN TableC ON(TableB.cID= Tablec.cID)
INNER JOIN TableA ta ON(ta.dID= TableD.dID)
WHERE (DATE(TableC.date)=date(now()))


I can do a 2 files join like -  val joinedVal =
g1.join(g2,g1.col("kmer") === g2.col("kmer"))

But I am trying to find common kmer strings  from 4 files.

Thanks

Roni


Re: Unable to save dataframe with UDT created with sqlContext.createDataFrame

2015-03-31 Thread Xiangrui Meng
I cannot reproduce this error on master, but I'm not aware of any
recent bug fixes that are related. Could you build and try the current
master? -Xiangrui

On Tue, Mar 31, 2015 at 4:10 AM, Jaonary Rabarisoa  wrote:
> Hi all,
>
> DataFrame with an user defined type (here mllib.Vector) created with
> sqlContex.createDataFrame can't be saved to parquet file and raise
> ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast
> to org.apache.spark.sql.Row error.
>
> Here is an example of code to reproduce this error :
>
> object TestDataFrame {
>
>   def main(args: Array[String]): Unit = {
> //System.loadLibrary(Core.NATIVE_LIBRARY_NAME)
> val conf = new
> SparkConf().setAppName("RankingEval").setMaster("local[8]")
>   .set("spark.executor.memory", "6g")
>
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
>
> import sqlContext.implicits._
>
> val data = sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10
> val dataDF = data.toDF
>
> dataDF.save("test1.parquet")
>
> val dataDF2 = sqlContext.createDataFrame(dataDF.rdd, dataDF.schema)
>
> dataDF2.save("test2.parquet")
>   }
> }
>
>
> Is this related to https://issues.apache.org/jira/browse/SPARK-5532 and how
> can it be solved ?
>
>
> Cheers,
>
>
> Jao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-03-31 Thread Xiangrui Meng
Hey Sean,

That is true for explicit model, but not for implicit. The ALS-WR
paper doesn't cover the implicit model. In implicit formulation, a
sub-problem (for v_j) is:

min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2

This is a sum for all i but not just the users who rate item j. In
this case, if we set X=m_j, the number of observed ratings for item j,
it is not really scale invariant. We have #users user vectors in the
least squares problem but only penalize lambda * #ratings. I was
suggesting using lambda * m directly for implicit model to match the
number of vectors in the least squares problem. Well, this is my
theory. I don't find any public work about it.

Best,
Xiangrui

On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen  wrote:
> I had always understood the formulation to be the first option you
> describe. Lambda is scaled by the number of items the user has rated /
> interacted with. I think the goal is to avoid fitting the tastes of
> prolific users disproportionately just because they have many ratings
> to fit. This is what's described in the ALS-WR paper we link to on the
> Spark web site, in equation 5
> (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf)
>
> I think this also gets you the scale-invariance? For every additional
> rating from user i to product j, you add one new term to the
> squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the
> regularization term by lambda * (|u_i|^2 + |m_j|^2)  They are at least
> both increasing about linearly as ratings increase. If the
> regularization term is multiplied by the total number of users and
> products in the model, then it's fixed.
>
> I might misunderstand you and/or be speaking about something slightly
> different when it comes to invariance. But FWIW I had always
> understood the regularization to be multiplied by the number of
> explicit ratings.
>
> On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng  wrote:
>> Okay, I didn't realize that I changed the behavior of lambda in 1.3.
>> to make it "scale-invariant", but it is worth discussing whether this
>> is a good change. In 1.2, we multiply lambda by the number ratings in
>> each sub-problem. This makes it "scale-invariant" for explicit
>> feedback. However, in implicit feedback model, a user's sub-problem
>> contains all item factors. Then the question is whether we should
>> multiply lambda by the number of explicit ratings from this user or by
>> the total number of items. We used the former in 1.2 but changed to
>> the latter in 1.3. So you should try a smaller lambda to get a similar
>> result in 1.3.
>>
>> Sean and Shuo, which approach do you prefer? Do you know any existing
>> work discussing this?
>>
>> Best,
>> Xiangrui
>>
>>
>> On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng  wrote:
>>> This sounds like a bug ... Did you try a different lambda? It would be
>>> great if you can share your dataset or re-produce this issue on the
>>> public dataset. Thanks! -Xiangrui
>>>
>>> On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody  wrote:
 After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly
 smaller factors (and hence scores). For example, the first few product's
 factor values in 1.2.0 are (0.04821, -0.00674,  -0.0325). In 1.3.0, the
 first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This
 difference of several orders of magnitude is consistent throughout both 
 user
 and product. The recommendations from 1.2.0 are subjectively much better
 than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less
 memory.

 My first thought is that there is too much regularization in the 1.3.0
 results, but I'm using the same lambda parameter value. This is a snippet 
 of
 my scala code:
 .
 val rank = 75
 val numIterations = 15
 val alpha = 10
 val lambda = 0.01
 val model = ALS.trainImplicit(train_data, rank, numIterations,
 lambda=lambda, alpha=alpha)
 .

 The code and input data are identical across both versions. Did anything
 change between the two versions I'm not aware of? I'd appreciate any help!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why "Shuffle Write" is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Bijay Pathak
Hi Udit,

The persisted RDDs in memory is cleared by Spark using LRU policy and you
can also set the time to clear the persisted RDDs and meta-data by setting*
spark.cleaner.ttl *(default infinite). But I am not aware about any
properties to clean the older shuffle write from from disks.

thanks,
bijay

On Tue, Mar 31, 2015 at 1:50 PM, Udit Mehta  wrote:

> Thanks for the reply.
> This will reduce the shuffle write to disk to an extent but for a long
> running job(multiple days), the shuffle write would still occupy a lot of
> space on disk. Why do we need to store the data from older map tasks to
> memory?
>
> On Tue, Mar 31, 2015 at 1:19 PM, Bijay Pathak 
> wrote:
>
>> The Spark Sort-Based Shuffle (default from 1.1) keeps the data from
>> each Map tasks to memory until they they can't fit after which they
>> are sorted and spilled to disk. You can reduce the Shuffle write to
>> disk by increasing spark.shuffle.memoryFraction(default 0.2).
>>
>> By writing the shuffle output to disk the Spark lineage can be
>> truncated when the RDDs are already materialized as the side-effects
>> of earlier shuffle.This is the under the hood optimization in Spark
>> which is only possible because of shuffle output output being written
>> to disk.
>>
>> You can set spark.shuffle.spill to false if you don't want to spill to
>> the disk and assuming you have enough heap memory.
>>
>> On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta  wrote:
>> > I have noticed a similar issue when using spark streaming. The spark
>> shuffle
>> > write size increases to a large size(in GB) and then the app crashes
>> saying:
>> > java.io.FileNotFoundException:
>> >
>> /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
>> > (No such file or directory)
>> >
>> > I dont understand why the shuffle size increases to such a large value
>> for
>> > long running jobs.
>> >
>> > Thanks,
>> > Udiy
>> >
>> > On Mon, Mar 30, 2015 at 4:01 AM, shahab 
>> wrote:
>> >>
>> >> Thanks Saisai. I will try your solution, but still i don't understand
>> why
>> >> filesystem should be used where there is a plenty of memory available!
>> >>
>> >>
>> >>
>> >> On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao 
>> >> wrote:
>> >>>
>> >>> Shuffle write will finally spill the data into file system as a bunch
>> of
>> >>> files. If you want to avoid disk write, you can mount a ramdisk and
>> >>> configure "spark.local.dir" to this ram disk. So shuffle output will
>> write
>> >>> to memory based FS, and will not introduce disk IO.
>> >>>
>> >>> Thanks
>> >>> Jerry
>> >>>
>> >>> 2015-03-30 17:15 GMT+08:00 shahab :
>> 
>>  Hi,
>> 
>>  I was looking at SparkUI, Executors, and I noticed that I have 597 MB
>>  for  "Shuffle while I am using cached temp-table and the Spark had 2
>> GB free
>>  memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!
>> 
>>  Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks
>> be
>>  done in memory?
>> 
>>  best,
>> 
>>  /Shahab
>> >>>
>> >>>
>> >>
>> >
>>
>
>


Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-03-31 Thread Sean Owen
Ah yeah I take your point. The squared error term is over the whole
user-item matrix, technically, in the implicit case. I suppose I am
used to assuming that the 0 terms in this matrix are weighted so much
less (because alpha is usually large-ish) that they're almost not
there, but they are. So I had just used the explicit formulation.

I suppose the result is kind of scale invariant, but not exactly. I
had not prioritized this property since I had generally built models
on the full data set and not a sample, and had assumed that lambda
would need to be retuned over time as the input grew anyway.

So, basically I don't know anything more than you do, sorry!

On Tue, Mar 31, 2015 at 10:41 PM, Xiangrui Meng  wrote:
> Hey Sean,
>
> That is true for explicit model, but not for implicit. The ALS-WR
> paper doesn't cover the implicit model. In implicit formulation, a
> sub-problem (for v_j) is:
>
> min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2
>
> This is a sum for all i but not just the users who rate item j. In
> this case, if we set X=m_j, the number of observed ratings for item j,
> it is not really scale invariant. We have #users user vectors in the
> least squares problem but only penalize lambda * #ratings. I was
> suggesting using lambda * m directly for implicit model to match the
> number of vectors in the least squares problem. Well, this is my
> theory. I don't find any public work about it.
>
> Best,
> Xiangrui
>
> On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen  wrote:
>> I had always understood the formulation to be the first option you
>> describe. Lambda is scaled by the number of items the user has rated /
>> interacted with. I think the goal is to avoid fitting the tastes of
>> prolific users disproportionately just because they have many ratings
>> to fit. This is what's described in the ALS-WR paper we link to on the
>> Spark web site, in equation 5
>> (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf)
>>
>> I think this also gets you the scale-invariance? For every additional
>> rating from user i to product j, you add one new term to the
>> squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the
>> regularization term by lambda * (|u_i|^2 + |m_j|^2)  They are at least
>> both increasing about linearly as ratings increase. If the
>> regularization term is multiplied by the total number of users and
>> products in the model, then it's fixed.
>>
>> I might misunderstand you and/or be speaking about something slightly
>> different when it comes to invariance. But FWIW I had always
>> understood the regularization to be multiplied by the number of
>> explicit ratings.
>>
>> On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng  wrote:
>>> Okay, I didn't realize that I changed the behavior of lambda in 1.3.
>>> to make it "scale-invariant", but it is worth discussing whether this
>>> is a good change. In 1.2, we multiply lambda by the number ratings in
>>> each sub-problem. This makes it "scale-invariant" for explicit
>>> feedback. However, in implicit feedback model, a user's sub-problem
>>> contains all item factors. Then the question is whether we should
>>> multiply lambda by the number of explicit ratings from this user or by
>>> the total number of items. We used the former in 1.2 but changed to
>>> the latter in 1.3. So you should try a smaller lambda to get a similar
>>> result in 1.3.
>>>
>>> Sean and Shuo, which approach do you prefer? Do you know any existing
>>> work discussing this?
>>>
>>> Best,
>>> Xiangrui
>>>
>>>
>>> On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng  wrote:
 This sounds like a bug ... Did you try a different lambda? It would be
 great if you can share your dataset or re-produce this issue on the
 public dataset. Thanks! -Xiangrui

 On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody  wrote:
> After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly
> smaller factors (and hence scores). For example, the first few product's
> factor values in 1.2.0 are (0.04821, -0.00674,  -0.0325). In 1.3.0, the
> first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This
> difference of several orders of magnitude is consistent throughout both 
> user
> and product. The recommendations from 1.2.0 are subjectively much better
> than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less
> memory.
>
> My first thought is that there is too much regularization in the 1.3.0
> results, but I'm using the same lambda parameter value. This is a snippet 
> of
> my scala code:
> .
> val rank = 75
> val numIterations = 15
> val alpha = 10
> val lambda = 0.01
> val model = ALS.trainImplicit(train_data, rank, numIterations,
> lambda=lambda, alpha=alpha)
> .
>
> The code and input data are identical across both versions. Did anything
> change between the two ver

Re: SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array

2015-03-31 Thread Todd Nist
So in looking at this a bit more, I gather the root cause is the fact that
the nested fields are represented as rows within rows, is that correct?  If
I don't know the size of the json array (it varies), using
x.getAs[Row](0).getString(0) is not really a valid solution.

Is the solution to apply a lateral view + explode to this?

I have attempted to change to a lateral view, but looks like my syntax is
off:

sqlContext.sql(
"SELECT path,`timestamp`, name, value, pe.value FROM metric
 lateral view explode(pathElements) a AS pe")
.collect.foreach(println(_))
Which results in:

15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0
Exception in thread "main" java.lang.RuntimeException: [1.68] failure:
``UNION'' expected but identifier view found

SELECT path,`timestamp`, name, value, pe.value FROM metric lateral
view explode(pathElements) a AS pe
   ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Is this the right approach?  Is this syntax available in 1.2.1:

SELECT
  v1.name, v2.city, v2.state
FROM people
  LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1
 as name, address
  LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2
 as city, state;


-Todd

On Tue, Mar 31, 2015 at 3:26 PM, Todd Nist  wrote:

> I am accessing ElasticSearch via the elasticsearch-hadoop and attempting
> to expose it via SparkSQL. I am using spark 1.2.1, latest supported by
> elasticsearch-hadoop, and "org.elasticsearch" % "elasticsearch-hadoop" %
> "2.1.0.BUILD-SNAPSHOT" of elasticsearch-hadoop. I’m
> encountering an issue when I attempt to query the following json after
> creating a temporary table from it. The json looks like this:
>
> PUT /_template/device
> {
>   "template": "dev*",
>

spark.sql.Row manipulation

2015-03-31 Thread roni
I have 2 paraquet files with format e.g  name , age, town
I read them  and then join them to get  all the names which are in both
towns  .
the resultant dataset is

res4: Array[org.apache.spark.sql.Row] = Array([name1, age1,
town1,name2,age2,town2])

Name 1 and name 2 are same as I am joining .
Now , I want to get only to the format (name , age1, age2)

But I cant seem to getting to manipulate the spark.sql.row.

Trying something like map(_.split (",")).map(a=> (a(0), a(1).trim().toInt))
does not work .

Can you suggest a way ?

Thanks
-R


Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Anny Chen
Thanks Petar  and Akhil for the suggestion.

Actually I changed the SPARK_MASTER_IP to the internal-ip, deleted the
"export SPARK_PUBLIC_DNS=xx" line in the spark-env.sh and also edited
the /etc/hosts as Akhil suggested, and now it is working! However I don't
know which change actually makes it work.

Thanks!
Anny

On Tue, Mar 31, 2015 at 10:26 AM, Petar Zecevic 
wrote:

>
> Did you try setting the SPARK_MASTER_IP parameter in spark-env.sh?
>
>
>
> On 31.3.2015. 19:19, Anny Chen wrote:
>
> Hi Akhil,
>
>  I tried editing the /etc/hosts on the master and on the workers, and
> seems it is not working for me.
>
>  I tried adding   and it didn't work. I then tried
> adding   and it didn't work either. I guess I should
> also edit the spark-env.sh file?
>
>  Thanks!
> Anny
>
> On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das 
> wrote:
>
>>  You can add an internal ip to public hostname mapping in your
>> /etc/hosts file, if your forwarding is proper then it wouldn't be a problem
>> there after.
>>
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Tue, Mar 31, 2015 at 9:18 AM, anny9699  wrote:
>>
>>> Hi,
>>>
>>> For security reasons, we added a server between my aws Spark Cluster and
>>> local, so I couldn't connect to the cluster directly. To see the SparkUI
>>> and
>>> its related work's  stdout and stderr, I used dynamic forwarding and
>>> configured the SOCKS proxy. Now I could see the SparkUI using the
>>> internal
>>> ec2 ip, however when I click on the application UI (4040) or the
>>> worker's UI
>>> (8081), it still automatically uses the public DNS instead of internal
>>> ec2
>>> ip, which the browser now couldn't show.
>>>
>>> Is there a way that I could configure this? I saw that one could
>>> configure
>>> the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could
>>> help. Does anyone experience the same issue?
>>>
>>> Thanks a lot!
>>> Anny
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>


deployment of spark on mesos and data locality in tachyon/hdfs

2015-03-31 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi,

I am fairly new to the spark ecosystem and I have been trying to setup
a spark on mesos deployment. I can't seem to figure out the "best
practices" around HDFS and Tachyon. The documentation about Spark's
data-locality section seems to point that each of my mesos slave nodes
should also run a hdfs datanode. This seems fine but I can't seem to
figure out how I would go about pushing tachyon into the mix.

How should i organize my cluster?
Should tachyon be colocated on my mesos worker nodes? or should all
the spark jobs reach out to a separate hdfs/tachyon cluster.

- -- Ankur Chauhan
-BEGIN PGP SIGNATURE-

iQEcBAEBAgAGBQJVGy4bAAoJEOSJAMhvLp3L5bkH/0MECyZkh3ptWzmsNnSNfGWp
Oh93TUfD+foXO2ya9D+hxuyAxbjfXs/68aCWZsUT6qdlBQU9T1vX+CmPOnpY1KPN
NJP3af+VK0osaFPo6k28OTql1iTnvb9Nq+WDlohxBC/hZtoYl4cVxu8JmRlou/nb
/wfpp0ShmJnlxsoPa6mVdwzjUjVQAfEpuet3Ow5veXeA9X7S55k/h0ZQrZtO8eXL
jJsKaT8ne9WZPhZwA4PkdzTxkXF3JNveCIKPzNttsJIaLlvd0nLA/wu6QWmxskp6
iliGSmEk5P1zZWPPnk+TPIqbA0Ttue7PeXpSrbA9+pYiNT4R/wAneMvmpTABuR4=
=8ijP
-END PGP SIGNATURE-

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: deployment of spark on mesos and data locality in tachyon/hdfs

2015-03-31 Thread Haoyuan Li
Tachyon should be co-located with Spark in this case.

Best,

Haoyuan

On Tue, Mar 31, 2015 at 4:30 PM, Ankur Chauhan 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA1
>
> Hi,
>
> I am fairly new to the spark ecosystem and I have been trying to setup
> a spark on mesos deployment. I can't seem to figure out the "best
> practices" around HDFS and Tachyon. The documentation about Spark's
> data-locality section seems to point that each of my mesos slave nodes
> should also run a hdfs datanode. This seems fine but I can't seem to
> figure out how I would go about pushing tachyon into the mix.
>
> How should i organize my cluster?
> Should tachyon be colocated on my mesos worker nodes? or should all
> the spark jobs reach out to a separate hdfs/tachyon cluster.
>
> - -- Ankur Chauhan
> -BEGIN PGP SIGNATURE-
>
> iQEcBAEBAgAGBQJVGy4bAAoJEOSJAMhvLp3L5bkH/0MECyZkh3ptWzmsNnSNfGWp
> Oh93TUfD+foXO2ya9D+hxuyAxbjfXs/68aCWZsUT6qdlBQU9T1vX+CmPOnpY1KPN
> NJP3af+VK0osaFPo6k28OTql1iTnvb9Nq+WDlohxBC/hZtoYl4cVxu8JmRlou/nb
> /wfpp0ShmJnlxsoPa6mVdwzjUjVQAfEpuet3Ow5veXeA9X7S55k/h0ZQrZtO8eXL
> jJsKaT8ne9WZPhZwA4PkdzTxkXF3JNveCIKPzNttsJIaLlvd0nLA/wu6QWmxskp6
> iliGSmEk5P1zZWPPnk+TPIqbA0Ttue7PeXpSrbA9+pYiNT4R/wAneMvmpTABuR4=
> =8ijP
> -END PGP SIGNATURE-
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Haoyuan Li
AMPLab, EECS, UC Berkeley
http://www.cs.berkeley.edu/~haoyuan/


Re: deployment of spark on mesos and data locality in tachyon/hdfs

2015-03-31 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi Haoyuan,

So on each mesos slave node I should allocate/section off some amount
of memory for tachyon (let's say 50% of the total memory) and the rest
for regular mesos tasks?

This means, on each slave node I would have tachyon worker (+ hdfs
configuration to talk to s3 or the hdfs datanode) and the mesos slave
process. Is this correct?

On 31/03/2015 16:43, Haoyuan Li wrote:
> Tachyon should be co-located with Spark in this case.
> 
> Best,
> 
> Haoyuan
> 
> On Tue, Mar 31, 2015 at 4:30 PM, Ankur Chauhan
> mailto:achau...@brightcove.com>> wrote:
> 
> Hi,
> 
> I am fairly new to the spark ecosystem and I have been trying to
> setup a spark on mesos deployment. I can't seem to figure out the
> "best practices" around HDFS and Tachyon. The documentation about
> Spark's data-locality section seems to point that each of my mesos
> slave nodes should also run a hdfs datanode. This seems fine but I
> can't seem to figure out how I would go about pushing tachyon into
> the mix.
> 
> How should i organize my cluster? Should tachyon be colocated on my
> mesos worker nodes? or should all the spark jobs reach out to a
> separate hdfs/tachyon cluster.
> 
> -- Ankur Chauhan
> 
> -
>
> 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>  For additional commands,
> e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> -- Haoyuan Li AMPLab, EECS, UC Berkeley 
> http://www.cs.berkeley.edu/~haoyuan/

- -- 
- -- Ankur Chauhan
-BEGIN PGP SIGNATURE-

iQEcBAEBAgAGBQJVGzKUAAoJEOSJAMhvLp3L3W4IAIVYiEKIZbC1a36/KWo94xYB
dvE4VXxF7z5FWmpuaHBEa+U1XWrR4cLVsQhocusOFn+oC7bstdltt3cGNAuwFSv6
Oogs4Sl1J4YZm8omKVdCkwD6Hv71HSntM8llz3qTW+Ljk2aKhfvNtp5nioQAm3e+
bs4ZKlCBij/xV3LbYYIePSS3lL0d9m1qEDJvi6jFcfm3gnBYeNeL9x92B5ylyth0
BGHnPN4sV/yopgrqOimLb12gSexHGNP1y6JBYy8NrHRY8SxkZ4sWKuyDnGDCOPOc
HC14Parf5Ly5FEz5g5WjF6HrXRdPlgr2ABxSLWOAB/siXsX9o/4yCy7NtDNcL6Y=
=f2xI
-END PGP SIGNATURE-

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Query REST web service with Spark?

2015-03-31 Thread Todd Nist
Here are a few ways to achieve what your loolking to do:

https://github.com/cjnolet/spark-jetty-server

Spark Job Server - https://github.com/spark-jobserver/spark-jobserver -

defines a REST API for Spark

Hue -

http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/

Spark Kernel project: https://github.com/ibm-et/spark-kernel

> The Spark Kernel's goal is to serve as the foundation for interactive
> applications. The project provides a client library in Scala that abstracts
> connecting to the kernel (containing a Spark Context), which can be
> embedded into a web application. We demonstrated this at StataConf when we
> embedded the Spark Kernel client into a Play application to provide an
> interactive web application that communicates to Spark via the Spark Kernel
> (hosting a SparkContext).


Hopefully one of those will give you what your looking for.

-Todd

On Tue, Mar 31, 2015 at 5:06 PM, Burak Yavuz  wrote:

> Hi,
>
> If I recall correctly, I've read people integrating REST calls to Spark
> Streaming jobs in the user list. I don't imagine any cases for why it
> shouldn't be possible.
>
> Best,
> Burak
>
> On Tue, Mar 31, 2015 at 1:46 PM, Minnow Noir  wrote:
>
>> We have have some data on Hadoop that needs augmented with data only
>> available to us via a REST service.  We're using Spark to search for, and
>> correct, missing data. Even though there are a lot of records to scour for
>> missing data, the total number of calls to the service is expected to be
>> low, so it would be ideal to do the whole job in Spark as we scour the data.
>>
>> I don't see anything obvious in the API or on Google relating to making
>> REST calls from a Spark job.  Is it possible?
>>
>> Thanks,
>>
>> Alec
>>
>
>


Re: Query REST web service with Spark?

2015-03-31 Thread Ashish Rangole
All you need is a client to the target REST service in your Spark task. It
could be as simple as a HttpClient. Most likely that client won't be
serializable in which case you initialize it lazily. There are useful
examples in Spark knowledge base gitbook that you can look at.
On Mar 31, 2015 1:48 PM, "Minnow Noir"  wrote:

We have have some data on Hadoop that needs augmented with data only
available to us via a REST service.  We're using Spark to search for, and
correct, missing data. Even though there are a lot of records to scour for
missing data, the total number of calls to the service is expected to be
low, so it would be ideal to do the whole job in Spark as we scour the data.

I don't see anything obvious in the API or on Google relating to making
REST calls from a Spark job.  Is it possible?

Thanks,

Alec


RE: SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array

2015-03-31 Thread java8964
You can use the HiveContext instead of SQLContext, which should support all the 
HiveQL, including lateral view explode.
SQLContext is not supporting that yet.
BTW, nice coding format in the email.
Yong

Date: Tue, 31 Mar 2015 18:18:19 -0400
Subject: Re: SparkSql - java.util.NoSuchElementException: key not found: node 
when access JSON Array
From: tsind...@gmail.com
To: user@spark.apache.org

So in looking at this a bit more, I gather the root cause is the fact that the 
nested fields are represented as rows within rows, is that correct?  If I don't 
know the size of the json array (it varies), using x.getAs[Row](0).getString(0) 
is not really a valid solution.  
Is the solution to apply a lateral view + explode to this? 
I have attempted to change to a lateral view, but looks like my syntax is off:








sqlContext.sql(
"SELECT path,`timestamp`, name, value, pe.value FROM metric 
 lateral view explode(pathElements) a AS pe")
.collect.foreach(println(_))
Which results in:
15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0
Exception in thread "main" java.lang.RuntimeException: [1.68] failure: 
``UNION'' expected but identifier view found

SELECT path,`timestamp`, name, value, pe.value FROM metric lateral view 
explode(pathElements) a AS pe
   ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Is this the 
right approach?  Is this syntax available in 1.2.1:
SELECT
  v1.name, v2.city, v2.state 
FROM people
  LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1 
 as name, address
  LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2
 as city, state;
-Todd
On Tue, Mar 31, 2015 at 3:26 PM, Todd Nist  wrote:
I am accessing ElasticSearch via the elasticsearch-hadoop and at

Re: spark.sql.Row manipulation

2015-03-31 Thread Michael Armbrust
You can do something like:

df.collect().map {
  case Row(name: String, age1: Int, age2: Int) => ...
}

On Tue, Mar 31, 2015 at 4:05 PM, roni  wrote:

> I have 2 paraquet files with format e.g  name , age, town
> I read them  and then join them to get  all the names which are in both
> towns  .
> the resultant dataset is
>
> res4: Array[org.apache.spark.sql.Row] = Array([name1, age1,
> town1,name2,age2,town2])
>
> Name 1 and name 2 are same as I am joining .
> Now , I want to get only to the format (name , age1, age2)
>
> But I cant seem to getting to manipulate the spark.sql.row.
>
> Trying something like map(_.split (",")).map(a=> (a(0),
> a(1).trim().toInt)) does not work .
>
> Can you suggest a way ?
>
> Thanks
> -R
>
>


Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
In my experiment, if I do not call gc() explicitly, the shuffle files will not 
be cleaned until the whole job finish… I don’t know why, maybe the rdd could 
not be GCed implicitly.
In my situation, a full gc in driver takes about 10 seconds, so I start a 
thread in driver to do GC  like this : (do GC every 120 seconds)

while (true) {
System.gc();
Thread.sleep(120 * 1000);
}


it works well now.
Do you have more elegant ways to clean the shuffle files?

Best Regards,
Sendong Li



> 在 2015年4月1日,上午5:09,Xiangrui Meng  写道:
> 
> Hey Guoqiang and Sendong,
> 
> Could you comment on the overhead of calling gc() explicitly? The shuffle 
> files should get cleaned in a few seconds after checkpointing, but it is 
> certainly possible to accumulates TBs of files in a few seconds. In this 
> case, calling gc() may work the same as waiting for a few seconds after each 
> checkpoint. Is it correct?
> 
> Best,
> Xiangrui
> 
> On Tue, Mar 31, 2015 at 8:58 AM, lisendong  > wrote:
> guoqiang ’s method works very well …
> 
> it only takes 1TB disk now.
> 
> thank you very much!
> 
> 
> 
>> 在 2015年3月31日,下午4:47,GuoQiang Li mailto:wi...@qq.com>> 写道:
>> 
>> You can try to enforce garbage collection:
>> 
>> /** Run GC and make sure it actually has run */
>> def runGC() {
>>   val weakRef = new WeakReference(new Object())
>>   val startTime = System.currentTimeMillis
>>   System.gc() // Make a best effort to run the garbage collection. It 
>> *usually* runs GC.
>>   // Wait until a weak reference object has been GCed
>>   System.runFinalization()
>>   while (weakRef.get != null) {
>> System.gc()
>> System.runFinalization()
>> Thread.sleep(200)
>> if (System.currentTimeMillis - startTime > 1) {
>>   throw new Exception("automatically cleanup error")
>> }
>>   }
>> }
>> 
>> 
>> -- 原始邮件 --
>> 发件人: "lisendong"mailto:lisend...@163.com>>; 
>> 发送时间: 2015年3月31日(星期二) 下午3:47
>> 收件人: "Xiangrui Meng"mailto:men...@gmail.com>>; 
>> 抄送: "Xiangrui Meng"mailto:m...@databricks.com>>; 
>> "user"mailto:user@spark.apache.org>>; "Sean 
>> Owen"mailto:so...@cloudera.com>>; "GuoQiang 
>> Li"mailto:wi...@qq.com>>; 
>> 主题: Re: different result from implicit ALS with explicit ALS
>> 
>> I have update my spark source code to 1.3.1.
>> 
>> the checkpoint works well. 
>> 
>> BUT the shuffle data still could not be delete automatically…the disk usage 
>> is still 30TB…
>> 
>> I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
>> 
>> Do you know how to solve my problem?
>> 
>> Sendong Li
>> 
>> 
>> 
>>> 在 2015年3月31日,上午12:11,Xiangrui Meng >> > 写道:
>>> 
>>> setCheckpointInterval was added in the current master and branch-1.3. 
>>> Please help check whether it works. It will be included in the 1.3.1 and 
>>> 1.4.0 release. -Xiangrui
>>> 
>>> On Mon, Mar 30, 2015 at 7:27 AM, lisendong >> > wrote:
>>> hi, xiangrui:
>>> I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
>>> the code is :
>>> https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
>>>  
>>> 
>>> 
>>> 
>>> the checkpoint is very important in my situation, because my task will 
>>> produce 1TB shuffle data in each iteration, it the shuffle data is not 
>>> deleted in each iteration(using checkpoint()), the task will produce 30TB 
>>> data…
>>> 
>>> 
>>> So I change the ALS code, and re-compile by myself, but it seems the 
>>> checkpoint does not take effects, and the task still occupy 30TB disk… ( I 
>>> only add two lines to the ALS.scala) :
>>> 
>>> 
>>> 
>>> 
>>> 
>>> and the driver’s log seems strange, why the log is printed together...
>>> 
>>> 
>>> thank you very much!
>>> 
>>> 
 在 2015年2月26日,下午11:33,163 mailto:lisend...@163.com>> 写道:
 
 Thank you very much for your opinion:)
 
 In our case, maybe it 's dangerous to treat un-observed item as negative 
 interaction(although we could give them small confidence, I think they are 
 still incredible...)
 
 I will do more experiments and give you feedback:)
 
 Thank you;)
 
 
> 在 2015年2月26日,23:16,Sean Owen  > 写道:
> 
> I believe that's right, and is what I was getting at. yes the implicit
> formulation ends up implicitly including every possible interaction in
> its loss function, even unobserved ones. That could be the difference.
> 
> This is mostly an academic question though. In practice, you have
> click-like data and should be using the implicit version for sure.
> 
> However you can give negative implicit feedback to the model. You
> could consider no-click as a mild, observed, negative interaction.
> That is: s

Re: Can't run spark-submit with an application jar on a Mesos cluster

2015-03-31 Thread seglo
Thanks hbogert.  There it is plain as day; it can't find my spark binaries. 
I thought it was enough to set SPARK_EXECUTOR_URI in my spark-env.sh since
this is all that's necessary to run spark-shell.sh against a mesos master,
but I also had to set spark.executor.uri in my spark-defaults.conf (or in my
app itself).  Thanks again for your help to troubleshoot this problem.

jclouds@development-5159-d3d:/tmp/mesos/slaves/20150322-040336-606645514-5050-2744-S1/frameworks/20150322-040336-606645514-5050-2744-0037/executors/1/runs/latest$
cat stderr
I0329 20:34:26.107267 10026 exec.cpp:132] Version: 0.21.1
I0329 20:34:26.109591 10031 exec.cpp:206] Executor registered on slave
20150322-040336-606645514-5050-2744-S1
sh: 1: /home/jclouds/spark-1.3.0-bin-hadoop2.4/bin/spark-class: not found
jclouds@development-5159-d3d:/tmp/mesos/slaves/20150322-040336-606645514-5050-2744-S1/frameworks/20150322-040336-606645514-5050-2744-0037/executors/1/runs/latest$
cat stdout
Registered executor on 10.217.7.180
Starting task 1
Forked command at 10036
sh -c ' "/home/jclouds/spark-1.3.0-bin-hadoop2.4/bin/spark-class"
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
akka.tcp://sparkDriver@development-5159-d9.c.learning-spark.internal:54746/user/CoarseGrainedScheduler
--executor-id 20150322-040336-606645514-5050-2744-S1 --hostname 10.217.7.180
--cores 10 --app-id 20150322-040336-606645514-5050-2744-0037'
Command exited with status 127 (pid: 10036)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-run-spark-submit-with-an-application-jar-on-a-Mesos-cluster-tp22277p22331.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: deployment of spark on mesos and data locality in tachyon/hdfs

2015-03-31 Thread Haoyuan Li
Ankur,

Response inline.

On Tue, Mar 31, 2015 at 4:49 PM, Ankur Chauhan 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA1
>
> Hi Haoyuan,
>
> So on each mesos slave node I should allocate/section off some amount
> of memory for tachyon (let's say 50% of the total memory) and the rest
> for regular mesos tasks?
>
>
This depends on your machine spec and workload. The high level idea is to
give Tachyon the memory size equals to the total memory size of the machine
minus other processes' memory needs.



> This means, on each slave node I would have tachyon worker (+ hdfs
> configuration to talk to s3 or the hdfs datanode) and the mesos slave
> process. Is this correct?
>


On each slave node, you would run a Tachyon worker. For underfs, you can
configure it to use S3 or HDFS or others.

Best,

Haoyuan


>
> On 31/03/2015 16:43, Haoyuan Li wrote:
> > Tachyon should be co-located with Spark in this case.
> >
> > Best,
> >
> > Haoyuan
> >
> > On Tue, Mar 31, 2015 at 4:30 PM, Ankur Chauhan
> > mailto:achau...@brightcove.com>> wrote:
> >
> > Hi,
> >
> > I am fairly new to the spark ecosystem and I have been trying to
> > setup a spark on mesos deployment. I can't seem to figure out the
> > "best practices" around HDFS and Tachyon. The documentation about
> > Spark's data-locality section seems to point that each of my mesos
> > slave nodes should also run a hdfs datanode. This seems fine but I
> > can't seem to figure out how I would go about pushing tachyon into
> > the mix.
> >
> > How should i organize my cluster? Should tachyon be colocated on my
> > mesos worker nodes? or should all the spark jobs reach out to a
> > separate hdfs/tachyon cluster.
> >
> > -- Ankur Chauhan
> >
> > -
> >
> >
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >  For additional commands,
> > e-mail: user-h...@spark.apache.org
> > 
> >
> >
> >
> >
> > -- Haoyuan Li AMPLab, EECS, UC Berkeley
> > http://www.cs.berkeley.edu/~haoyuan/
>
> - --
> - -- Ankur Chauhan
> -BEGIN PGP SIGNATURE-
>
> iQEcBAEBAgAGBQJVGzKUAAoJEOSJAMhvLp3L3W4IAIVYiEKIZbC1a36/KWo94xYB
> dvE4VXxF7z5FWmpuaHBEa+U1XWrR4cLVsQhocusOFn+oC7bstdltt3cGNAuwFSv6
> Oogs4Sl1J4YZm8omKVdCkwD6Hv71HSntM8llz3qTW+Ljk2aKhfvNtp5nioQAm3e+
> bs4ZKlCBij/xV3LbYYIePSS3lL0d9m1qEDJvi6jFcfm3gnBYeNeL9x92B5ylyth0
> BGHnPN4sV/yopgrqOimLb12gSexHGNP1y6JBYy8NrHRY8SxkZ4sWKuyDnGDCOPOc
> HC14Parf5Ly5FEz5g5WjF6HrXRdPlgr2ABxSLWOAB/siXsX9o/4yCy7NtDNcL6Y=
> =f2xI
> -END PGP SIGNATURE-
>



-- 
Haoyuan Li
AMPLab, EECS, UC Berkeley
http://www.cs.berkeley.edu/~haoyuan/


Minimum slots assigment to Spark on Mesos

2015-03-31 Thread Stratos Dimopoulos
Hi All,

I am running Spark & MR on Mesos. Is there a configuration setting for
Spark to define the minimum required slots (similar to MapReduce's
mapred.mesos.total.reduce.slots.minimum and mapred.mesos.total.map.slots.
minimum)? The most related property I see is this: spark.scheduler.
minRegisteredResourcesRatio found on the documentation here:
http://spark.apache.org/docs/1.2.1/configuration.html#spark-properties
What I wanted to have is a fixed amount of trackers assigned for Spark so I
can share my cluster with MR. Any suggestion on parts of code or
documentation that I should check for a full list of available
configurations?

thanks,
Stratos


Spark SQL saveAsParquet failed after a few waves

2015-03-31 Thread Yijie Shen
Hi,

I am using spark-1.3 prebuilt release with hadoop2.4 support and Hadoop 2.4.0.

I wrote a spark application(LoadApp) to generate data in each task and load the 
data into HDFS as parquet Files (use “saveAsParquet()” in spark sql)

When few waves (1 or 2) are used in a job, LoadApp could finish after a few 
failures and retries.
But when more waves (3) are involved in a job, the job would terminate 
abnormally.

All the failures I faced with is:
“java.io.IOException: The file being written is in an invalid state. Probably 
caused by an error thrown previously. Current state: COLUMN"

and the stacktraces  are:

java.io.IOException: The file being written is in an invalid state. Probably 
caused by an error thrown previously. Current state: COLUMN
at 
parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:137)
at 
parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:129)
at 
parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:173)
at 
parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)
at 
parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:634)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:648)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:648)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


I have no idea what happened since jobs may fail or success without any reason.

Thanks.


Yijie Shen

Re: Actor not found

2015-03-31 Thread Shixiong Zhu
Thanks for the log. It's really helpful. I created a JIRA to explain why it
will happen: https://issues.apache.org/jira/browse/SPARK-6640

However, will this error always happens in your environment?

Best Regards,
Shixiong Zhu

2015-03-31 22:36 GMT+08:00 sparkdi :

> This is the whole output from the shell:
>
> ~/spark-1.3.0-bin-hadoop2.4$ sudo bin/spark-shell
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
> log4j:WARN No appenders could be found for logger
> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 15/03/30 19:00:40 INFO SecurityManager: Changing view acls to: root
> 15/03/30 19:00:40 INFO SecurityManager: Changing modify acls to: root
> 15/03/30 19:00:40 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view pe
> rmissions: Set(root); users with modify permissions: Set(root)
> 15/03/30 19:00:40 INFO HttpServer: Starting HTTP Server
> 15/03/30 19:00:40 INFO Server: jetty-8.y.z-SNAPSHOT
> 15/03/30 19:00:40 INFO AbstractConnector: Started
> SocketConnector@0.0.0.0:47797
> 15/03/30 19:00:40 INFO Utils: Successfully started service 'HTTP class
> server' on port 47797.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.3.0
>   /_/
>
> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
> Type in expressions to have them evaluated.
> Type :help for more information.
> 15/03/30 19:00:42 INFO SparkContext: Running Spark version 1.3.0
> 15/03/30 19:00:42 INFO SecurityManager: Changing view acls to: root
> 15/03/30 19:00:42 INFO SecurityManager: Changing modify acls to: root
> 15/03/30 19:00:42 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view pe
> rmissions: Set(root); users with modify permissions: Set(root)
> 15/03/30 19:00:42 INFO Slf4jLogger: Slf4jLogger started
> 15/03/30 19:00:42 INFO Remoting: Starting remoting
> 15/03/30 19:00:43 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@vm:52574]
> 15/03/30 19:00:43 INFO Utils: Successfully started service 'sparkDriver' on
> port 52574.
> 15/03/30 19:00:43 INFO SparkEnv: Registering MapOutputTracker
> 15/03/30 19:00:43 INFO SparkEnv: Registering BlockManagerMaster
> 15/03/30 19:00:43 INFO DiskBlockManager: Created local directory at
> /tmp/spark-f71a8d86-6e49-4dfe-bb98-8e8581015acc/bl
> ockmgr-57532f5a-38db-4ba3-86d8-edef84f592e5
> 15/03/30 19:00:43 INFO MemoryStore: MemoryStore started with capacity 265.4
> MB
> 15/03/30 19:00:43 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-95e0a143-0de3-4c96-861c-968c9fae2746/h
> ttpd-cb029cd6-4943-479d-9b56-e7397489d9ea
> 15/03/30 19:00:43 INFO HttpServer: Starting HTTP Server
> 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
> 15/03/30 19:00:43 INFO AbstractConnector: Started
> SocketConnector@0.0.0.0:48500
> 15/03/30 19:00:43 INFO Utils: Successfully started service 'HTTP file
> server' on port 48500.
> 15/03/30 19:00:43 INFO SparkEnv: Registering OutputCommitCoordinator
> 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
> 15/03/30 19:00:43 INFO AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:4040
> 15/03/30 19:00:43 INFO Utils: Successfully started service 'SparkUI' on
> port
> 4040.
> 15/03/30 19:00:43 INFO SparkUI: Started SparkUI at http://vm:4040
> 15/03/30 19:00:43 INFO Executor: Starting executor ID  on host
> localhost
> 15/03/30 19:00:43 INFO Executor: Using REPL class URI:
> http://10.11.204.80:47797
> 15/03/30 19:00:43 INFO AkkaUtils: Connecting to HeartbeatReceiver:
> akka.tcp://sparkDriver@vm:5
> 2574/user/HeartbeatReceiver
> 15/03/30 19:00:43 ERROR OneForOneStrategy: Actor not found for:
> ActorSelection[Anchor(akka://sparkDriver/deadLetters),
> Path(/)]
> akka.actor.ActorInitializationException: exception during creation
> at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
> at akka.actor.ActorCell.create(ActorCell.scala:596)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWor

Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-03-31 Thread Xiangrui Meng
I created a JIRA for this:
https://issues.apache.org/jira/browse/SPARK-6637. Since we don't have
a clear answer about how the scaling should be handled. Maybe the best
solution for now is to switch back to the 1.2 scaling. -Xiangrui

On Tue, Mar 31, 2015 at 2:50 PM, Sean Owen  wrote:
> Ah yeah I take your point. The squared error term is over the whole
> user-item matrix, technically, in the implicit case. I suppose I am
> used to assuming that the 0 terms in this matrix are weighted so much
> less (because alpha is usually large-ish) that they're almost not
> there, but they are. So I had just used the explicit formulation.
>
> I suppose the result is kind of scale invariant, but not exactly. I
> had not prioritized this property since I had generally built models
> on the full data set and not a sample, and had assumed that lambda
> would need to be retuned over time as the input grew anyway.
>
> So, basically I don't know anything more than you do, sorry!
>
> On Tue, Mar 31, 2015 at 10:41 PM, Xiangrui Meng  wrote:
>> Hey Sean,
>>
>> That is true for explicit model, but not for implicit. The ALS-WR
>> paper doesn't cover the implicit model. In implicit formulation, a
>> sub-problem (for v_j) is:
>>
>> min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2
>>
>> This is a sum for all i but not just the users who rate item j. In
>> this case, if we set X=m_j, the number of observed ratings for item j,
>> it is not really scale invariant. We have #users user vectors in the
>> least squares problem but only penalize lambda * #ratings. I was
>> suggesting using lambda * m directly for implicit model to match the
>> number of vectors in the least squares problem. Well, this is my
>> theory. I don't find any public work about it.
>>
>> Best,
>> Xiangrui
>>
>> On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen  wrote:
>>> I had always understood the formulation to be the first option you
>>> describe. Lambda is scaled by the number of items the user has rated /
>>> interacted with. I think the goal is to avoid fitting the tastes of
>>> prolific users disproportionately just because they have many ratings
>>> to fit. This is what's described in the ALS-WR paper we link to on the
>>> Spark web site, in equation 5
>>> (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf)
>>>
>>> I think this also gets you the scale-invariance? For every additional
>>> rating from user i to product j, you add one new term to the
>>> squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the
>>> regularization term by lambda * (|u_i|^2 + |m_j|^2)  They are at least
>>> both increasing about linearly as ratings increase. If the
>>> regularization term is multiplied by the total number of users and
>>> products in the model, then it's fixed.
>>>
>>> I might misunderstand you and/or be speaking about something slightly
>>> different when it comes to invariance. But FWIW I had always
>>> understood the regularization to be multiplied by the number of
>>> explicit ratings.
>>>
>>> On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng  wrote:
 Okay, I didn't realize that I changed the behavior of lambda in 1.3.
 to make it "scale-invariant", but it is worth discussing whether this
 is a good change. In 1.2, we multiply lambda by the number ratings in
 each sub-problem. This makes it "scale-invariant" for explicit
 feedback. However, in implicit feedback model, a user's sub-problem
 contains all item factors. Then the question is whether we should
 multiply lambda by the number of explicit ratings from this user or by
 the total number of items. We used the former in 1.2 but changed to
 the latter in 1.3. So you should try a smaller lambda to get a similar
 result in 1.3.

 Sean and Shuo, which approach do you prefer? Do you know any existing
 work discussing this?

 Best,
 Xiangrui


 On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng  wrote:
> This sounds like a bug ... Did you try a different lambda? It would be
> great if you can share your dataset or re-produce this issue on the
> public dataset. Thanks! -Xiangrui
>
> On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody  wrote:
>> After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly
>> smaller factors (and hence scores). For example, the first few product's
>> factor values in 1.2.0 are (0.04821, -0.00674,  -0.0325). In 1.3.0, the
>> first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This
>> difference of several orders of magnitude is consistent throughout both 
>> user
>> and product. The recommendations from 1.2.0 are subjectively much better
>> than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses 
>> less
>> memory.
>>
>> My first thought is that there is too much regularization in the 1.3.0
>> results, but I'm using the same lambda paramete

Anatomy of RDD : Deep dive into RDD data structure

2015-03-31 Thread madhu phatak
Hi,
 Recently I gave a talk on RDD data structure which gives in depth
understanding of spark internals. You can watch it on youtube
. Also slides are on slideshare
 and code is on github
.



Regards,
Madhukara Phatak
http://datamantra.io/


Creating Partitioned Parquet Tables via SparkSQL

2015-03-31 Thread Denny Lee
Creating Parquet tables via .saveAsTable is great but was wondering if
there was an equivalent way to create partitioned parquet tables.

Thanks!


Re: Broadcasting a parquet file using spark and python

2015-03-31 Thread Jitesh chandra Mishra
Hi Michael,

Thanks for your response. I am running 1.2.1.

Is there any workaround to achieve the same with 1.2.1?

Thanks,
Jitesh

On Wed, Apr 1, 2015 at 12:25 AM, Michael Armbrust 
wrote:

> In Spark 1.3 I would expect this to happen automatically when the parquet
> table is small (< 10mb, configurable with 
> spark.sql.autoBroadcastJoinThreshold).
> If you are running 1.3 and not seeing this, can you show the code you are
> using to create the table?
>
> On Tue, Mar 31, 2015 at 3:25 AM, jitesh129  wrote:
>
>> How can we implement a BroadcastHashJoin for spark with python?
>>
>> My SparkSQL inner joins are taking a lot of time since it is performing
>> ShuffledHashJoin.
>>
>> Tables on which join is performed are stored as parquet files.
>>
>> Please help.
>>
>> Thanks and regards,
>> Jitesh
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


rdd.cache() not working ?

2015-03-31 Thread fightf...@163.com
Hi, all

Running the following code snippet through spark-shell, however cannot see any 
cached storage partitions in web ui.

Does this mean that cache now working ? Cause if we issue person.count again 
that we cannot say any time consuming

performance upgrading. Hope anyone can explain this for a little. 

Best,

Sun.

   case class Person(id: Int, col1: String)

   val person = 
sc.textFile("hdfs://namenode_host:8020/user/person.txt").map(_.split(",")).map(p
 => Person(p(0).trim.toInt, p(1)))
   
   person.cache
   
   person.count



fightf...@163.com


Re: deployment of spark on mesos and data locality in tachyon/hdfs

2015-03-31 Thread Sean Bigdatafun
(resending...)

I was thinking the same setup… But the more I think of this problem, and
the more interesting this could be.

If we allocate 50% total memory to Tachyon statically, then the Mesos
benefits of dynamically scheduling resources go away altogether.

Can Tachyon be resource managed by Mesos (dynamically)? Any thought or
comment?

Sean
>
>
>
>
>
> >Hi Haoyuan,
>
> >So on each mesos slave node I should allocate/section off some amount
> >of memory for tachyon (let's say 50% of the total memory) and the rest
> >for regular mesos tasks?
>
> >This means, on each slave node I would have tachyon worker (+ hdfs
> >configuration to talk to s3 or the hdfs datanode) and the mesos slave
> ?process. Is this correct?
>
>
>


-- 
--Sean


Re: rdd.cache() not working ?

2015-03-31 Thread Taotao.Li
rerun person.count and you will see the performance of cache. 

person.cache would not cache it right now. It'll actually cache this RDD after 
one action[person.count here] 

- 原始邮件 -

发件人: fightf...@163.com 
收件人: "user"  
发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25 
主题: rdd.cache() not working ? 

Hi, all 

Running the following code snippet through spark-shell, however cannot see any 
cached storage partitions in web ui. 

Does this mean that cache now working ? Cause if we issue person.count again 
that we cannot say any time consuming 

performance upgrading. Hope anyone can explain this for a little. 

Best, 

Sun. 

case class Person(id: Int, col1: String) 

val person = 
sc.textFile("hdfs://namenode_host:8020/user/person.txt").map(_.split(",")).map(p
 => Person(p(0).trim.toInt, p(1))) 
person.cache 
person.count 


fightf...@163.com 



-- 


--- 

Thanks & Best regards 

李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 

地址:上海市浦东新区陆家嘴西路 99 号万向大厦8 楼, 200120 
Address :Wanxiang Towen 8 F, Lujiazui West Rd. No.99, Pudong New District, 
Shanghai, 200120 

电话 |Phone : 021-60216502 手机 |Mobile: +86-18202171279 



SparkStreaming batch processing time question

2015-03-31 Thread luohui20001
hi guys:
  I got a question when reading 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval.
 
 What will happen to the streaming data if the batch processing time is 
bigger than the batch interval? Will the next batch data be dalayed to process 
or the unfinished processing job to be discarded?
 
thanks for any ideas shared?





 
Thanks&Best regards!
罗辉 San.Luo


Re: When do map how to get the line number?

2015-03-31 Thread jitesh129
You can use zipWithIndex() to get index for each record and then you can
increment by 1 for each index.

val tf=sc.textFile("test").zipWithIndex()
tf.map(s=>(s[1]+1,s[0]))

Above should serve your purpose.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/When-do-map-how-to-get-the-line-number-tp22318p22334.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using 'fair' scheduler mode

2015-03-31 Thread Raghavendra Pandey
I am facing the same issue. FAIR and FIFO behaving in the same way.

On Wed, Apr 1, 2015 at 1:49 AM, asadrao  wrote:

> Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the
> first query is a very expensive query (ex: ‘select *’ on a really big data
> set) than any subsequent query seem to get blocked. I would have expected
> the second query to run in parallel since I am using the ‘fair’ scheduler
> mode not the ‘fifo’. I am submitting the query through thrift server.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkStreaming batch processing time question

2015-03-31 Thread Akhil Das
It will add scheduling delay for the new batch. The new batch data will be
processed after finish up the previous batch, when the time is too high,
sometimes it will throw fetch failures as the batch data could get removed
from memory.

Thanks
Best Regards

On Wed, Apr 1, 2015 at 11:35 AM,  wrote:

> hi guys:
>
>   I got a question when reading
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval
> .
>
>
>
>  What will happen to the streaming data if the batch processing
> time is bigger than the batch interval? Will the next batch data be dalayed
> to process or the unfinished processing job to be discarded?
>
>
>
> thanks for any ideas shared?
>
> 
>
> Thanks&Best regards!
> 罗辉 San.Luo
>


Re: Re: rdd.cache() not working ?

2015-03-31 Thread fightf...@163.com
Hi 
That is just the issue. After running person.cache we then run person.count 
however, there still not be any cache performance showed from web ui storage. 

Thanks,
Sun.



fightf...@163.com
 
From: Taotao.Li
Date: 2015-04-01 14:02
To: fightfate
CC: user
Subject: Re: rdd.cache() not working ?
rerun person.count and you will see the performance of cache.

person.cache would not cache it right now. It'll actually cache this RDD after 
one action[person.count here]



发件人: fightf...@163.com
收件人: "user" 
发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25
主题: rdd.cache() not working ?

Hi, all

Running the following code snippet through spark-shell, however cannot see any 
cached storage partitions in web ui.

Does this mean that cache now working ? Cause if we issue person.count again 
that we cannot say any time consuming

performance upgrading. Hope anyone can explain this for a little. 

Best,

Sun.

   case class Person(id: Int, col1: String)

   val person = 
sc.textFile("hdfs://namenode_host:8020/user/person.txt").map(_.split(",")).map(p
 => Person(p(0).trim.toInt, p(1)))
   
   person.cache
   
   person.count



fightf...@163.com



-- 
---
Thanks & Best regards
李涛涛 Taotao · Li  |  Fixed Income@Datayes  |  Software Engineer
地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120
Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, 
Shanghai, 200120
电话|Phone:021-60216502  手机|Mobile: +86-18202171279



Re: --driver-memory parameter doesn't work for spark-submmit on yarn?

2015-03-31 Thread Akhil Das
Once you submit the job do a ps aux | grep spark-submit and see how much is
the heap space allocated to the process (the -Xmx params), if you are
seeing a lower value you could try increasing it yourself with:

export _JAVA_OPTIONS="-Xmx5g"

Thanks
Best Regards

On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng  wrote:

> Hi All,
>
>
>
> Below is the my shell script:
>
>
>
> /home/hadoop/spark/bin/spark-submit --driver-memory=5G
> --executor-memory=40G --master yarn-client --class
> com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar
> s3://bucket/vriscBatchConf.properties
>
>
>
> My driver will load some resources and then broadcast to all executors.
>
>
>
> That resources is only 600MB in ser format, but I always has out of memory
> exception, it looks like the driver doesn’t allocate right memory to my
> driver.
>
>
>
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>
> at java.lang.reflect.Array.newArray(Native Method)
>
> at java.lang.reflect.Array.newInstance(Array.java:70)
>
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670)
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
> at
> com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68)
>
>
>
> Do I do anything wrong here?
>
>
>
> And no matter how much I set for --driver-memory value (from 512M to
> 20G), it always give me error on the same line (that line try to load a
> 600MB java serialization file). So looks like the script doesn’t allocate
> right memory to driver in my case?
>
>
>
> Regards,
>
>
>
> Shuai
>


Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs

2015-03-31 Thread twinkle sachdeva
Hi,

In spark over YARN, there is a property "spark.yarn.max.executor.failures"
which controls the maximum number of executor's failure an application will
survive.

If number of executor's failures ( due to any reason like OOM or machine
failure etc ), increases this value then applications quits.

For small duration spark job, this looks fine, but for the long running
jobs as this does not take into account the duration, this can lead to same
treatment for two different scenarios ( mentioned below) :
1. executors failing with in 5 mins.
2. executors failing sparsely, but at some point even a single executor
failure ( which application could have survived ) can make the application
quit.

Sending it to the community to listen what kind of behaviour / strategy
they think will be suitable for long running spark jobs or spark streaming
jobs.

Thanks and Regards,
Twinkle


回复:Re: SparkStreaming batch processing time question

2015-03-31 Thread luohui20001
hummm, got it. Thank you Akhil.






 
Thanks&Best regards!
罗辉 San.Luo



- 原始邮件 -
发件人:Akhil Das 
收件人:罗辉 
抄送人:user 
主题:Re: SparkStreaming batch processing time question
日期:2015年04月01日 14点31分



It will add scheduling delay for the new batch. The new batch data will be 
processed after finish up the previous batch, when the time is too high, 
sometimes it will throw fetch failures as the batch data could get removed from 
memory.



Thanks 
Best Regards

On Wed, Apr 1, 2015 at 11:35 AM,  wrote:


hi guys:
  I got a question when reading 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval.
 
 What will happen to the streaming data if the batch processing time is 
bigger than the batch interval? Will the next batch data be dalayed to process 
or the unfinished processing job to be discarded?
 
thanks for any ideas shared?





 
Thanks&Best regards!
罗辉 San.Luo