Re: Can not control bucket files number if it was speficed

2016-09-19 Thread Fridtjof Sander
I didn't follow all of this thread, but if you want to have exactly one 
bucket-output-file per RDD-partition, you have to repartition (shuffle) 
your data on the bucket-key.
If you don't repartition (shuffle), you may have records with different 
bucket-keys in the same RDD-partition, leading to two 
bucket-output-files for that RDD-partition.
So, in your example from Sep 17, you're missing a res.repartition(8, 
"xxx_id").write...



Am 19.09.2016 um 16:26 schrieb Qiang Li:
I tried dataframe writer with coalesce or repartition api, but it can 
not meet my requirements, I still can get far more files than bucket 
number, and spark jobs is very slow after I add coalesce or repartition.


I've get back to Hive, use Hive to do data conversion.

Thanks.

On Sat, Sep 17, 2016 at 11:12 PM, Mich Talebzadeh 
> wrote:


Ok

You have an external table in Hive  on S3 with partition and
bucket. say

..
PARTITIONED BY (year int, month string)
CLUSTERED BY (prod_id) INTO 256 BUCKETS
STORED AS ORC.

with have within each partition buckets on prod_id equally spread
to 256 hash partitions/bucket. bucket is the hash partitioning
within a Hive table partition.

Now my question is how do you force data to go for a given
partition p into bucket n. Since you have already specified say
256 buckets then whatever prod_id is, it still has to go to one of
256 buckets.

Within Spark , the number of files is actually the number of
underlying RDD partitions.  You can find this out by invoking
toJavaRDD.partitions.size() and force it to accept a certain
number of partitions by using coalesce(n) or something like that.
However, I am not sure the output will be what you expect to be.

Worth trying to sort it out the way you want with partition 8

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val s = spark.read.parquet("oraclehadoop.sales2")
s.coalesce(8).registerTempTable("tmp")
HiveContext.sql("SELECT * FROM tmp SORT BY
prod_id").write.mode("overwrite").parquet("test.sales6")


It may work.

HTH



Dr Mich Talebzadeh

LinkedIn

/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

/

http://talebzadehmich.wordpress.com



*Disclaimer:* Use it at your own risk.Any and all responsibility
for any loss, damage or destruction of data or any other property
which may arise from relying on this email's technical content is
explicitly disclaimed. The author will in no case be liable for
any monetary damages arising from such loss, damage or destruction.


On 17 September 2016 at 15:00, Qiang Li > wrote:

I want to run job to load existing data from one S3 bucket,
process it, then store to another bucket with Partition, and
Bucket (data format conversion from tsv to parquet with gzip).
So source data and results both are in S3, different are the
tools which I used to process data.

First I process data with Hive, create external tables with s3
 location with partition and bucket number, jobs will generate
files under each partition directory, and it was equal bucket
number.
then everything is ok, I also can use hive/presto/spark to run
other jobs on results data in S3.

But if I run spark job with partition and bucket, sort
feature, spark job will generate far more files than bucket
number under each partition directory, so presto or hive can
not recongnize  the bucket because wrong files number is not
equal bucket number in spark job.

for example:
...
val options = Map("path" -> "result_bucket_path",
"compression" -> "gzip")
res.write.mode("append").format("parquet").partitionBy("year",
"month", "day").bucketBy(8,

"xxx_id").sortBy("xxx_id").options(options).saveAsTable("result_bucket_name")
...

The results bucket files under each partition is far more than 8.


On Sat, Sep 17, 2016 at 9:27 PM, Mich Talebzadeh
>
wrote:

It is difficult to guess what is happening with your data.

First when you say you use Spark to generate test data are
these selected randomly and then stored in Hive/etc table?

HTH

Dr Mich Talebzadeh

LinkedIn

/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

/


Re: How to make the result of sortByKey distributed evenly?

2016-09-06 Thread Fridtjof Sander
Your data has only two keys, and basically all values are assigned to 
only one of them. There is no better way to distribute the keys, than 
the one Spark executes.


What you have to do is to use different keys to sort and range-partition 
on. Try to invoke sortBy() on a non-pair-RDD. This will take both parts 
of your data as key so sort on. You can also set your tuple as key 
manually, and set a constant int or something as value.


Am 06.09.16 um 10:13 schrieb Zhang, Liyun:


Hi all:

  I have a question about RDD.sortByKey

val n=2
val sorted=sc.parallelize(2 to n).map(x=>(x/n,x)).sortByKey()
 sorted.saveAsTextFile("hdfs://bdpe42:8020/SkewedGroupByTest")

sc.parallelize(2 to n).map(x=>(x/n,x)) will generate pairs like 
[(0,2),(0,3),…..,(0,1),(1,2)], the key is skewed.


The result of sortByKey is expected to distributed evenly. But when I 
view the result and found that part-0 is large and part-1 is 
small.


 hadoop fs -ls /SkewedGroupByTest/
16/09/06 03:24:55 WARN util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes 
where applicable

Found 3 items
-rw-r--r-- 1 root supergroup 0 2016-09-06 03:21 /SkewedGroupByTest 
/_SUCCESS
-rw-r--r-- 1 root supergroup 188878 2016-09-06 03:21 
/SkewedGroupByTest/part-0
-rw-r--r-- 1 root supergroup 10 2016-09-06 03:21 
/SkewedGroupByTest/part-1


How can I get the result distributed evenly?  I don’t need that the 
key in the part-x are same and only need to guarantee the data in 
part-0 ~ part-x is sorted.


Thanks for any help!

Kelly Zhang/Zhang,Liyun

Best Regards





Re: Splitting columns from a text file

2016-09-05 Thread Fridtjof Sander

Ask yourself how to access the third element in an array in Scala.


Am 05.09.2016 um 16:14 schrieb Ashok Kumar:

Hi,
I want to filter them for values.

This is what is in array

74,20160905-133143,98.11218069128827594148

I want to filter anything > 50.0 in the third column

Thanks




On Monday, 5 September 2016, 15:07, ayan guha  wrote:


Hi

x.split returns an array. So, after first map, you will get RDD of 
arrays. What is your expected outcome of 2nd map?


On Mon, Sep 5, 2016 at 11:30 PM, Ashok Kumar 
> 
wrote:


Thank you sir.

This is what I get

scala> textFile.map(x=> x.split(","))
res52: org.apache.spark.rdd.RDD[ Array[String]] =
MapPartitionsRDD[27] at map at :27

How can I work on individual columns. I understand they are strings

scala> textFile.map(x=> x.split(",")).map(x => (x.getString(0))
 | )
:27: error: value getString is not a member of Array[String]
 textFile.map(x=> x.split(",")).map(x => (x.getString(0))

regards




On Monday, 5 September 2016, 13:51, Somasundaram Sekar
> wrote:


Basic error, you get back an RDD on transformations like map.
sc.textFile("filename").map(x => x.split(",")

On 5 Sep 2016 6:19 pm, "Ashok Kumar"
 wrote:

Hi,

I have a text file as below that I read in

74,20160905-133143,98. 11218069128827594148
75,20160905-133143,49. 52776998815916807742
76,20160905-133143,56. 08029957123980984556
77,20160905-133143,46. 63689526544407522777
78,20160905-133143,84. 88227141164402181551
79,20160905-133143,68. 72408602520662115000

val textFile = sc.textFile("/tmp/mytextfile. txt")

Now I want to split the rows separated by ","

scala> textFile.map(x=>x.toString). split(",")
:27: error: value split is not a member of
org.apache.spark.rdd.RDD[ String]
 textFile.map(x=>x.toString). split(",")

However, the above throws error?

Any ideas what is wrong or how I can do this if I can avoid
converting it to String?

Thanking






--
Best Regards,
Ayan Guha






Re: any idea what this error could be?

2016-09-03 Thread Fridtjof Sander
I see. The default scala version changed to 2.11 with Spark 2.0.0 afaik, so 
that's probably the version you get when downloading prepackaged binaries. Glad 
I could help ;)

Am 3. September 2016 23:59:51 MESZ, schrieb kant kodali <kanth...@gmail.com>:
>@Fridtjof you are right!
>changing it to this Fixed it!
>ompile group: org.apache.spark' name: 'spark-core_2.11' version:
>'2.0.0'
>compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version:
>'2.0.0'
>
>
>
>
>
>
>
>On Sat, Sep 3, 2016 12:30 PM, kant kodali kanth...@gmail.com
>wrote:
>I increased the memory but nothing has changed I still get the same
>error.
>@Fridtjofon my driver side I am using the following dependenciescompile
>group:
>org.apache.spark' name: 'spark-core_2.10' version: '2.0.0'
>compile group: 'org.apache.spark' name: 'spark-streaming_2.10' version:
>'2.0.0'
>on the executor side I don't know what jars are being used but I have
>installed
>using this zip filespark-2.0.0-bin-hadoop2.7.tgz
> 
>
>
>
>
>
>On Sat, Sep 3, 2016 4:20 AM, Fridtjof Sander
>fridtjof.san...@googlemail.com
>wrote:
>There is an InvalidClassException complaining about non-matching
>serialVersionUIDs. Shouldn't that be caused by different jars on
>executors and
>driver?
>
>
>Am 03.09.2016 1:04 nachm. schrieb "Tal Grynbaum"
><tal.grynb...@gmail.com>:
>My guess is that you're running out of memory somewhere.  Try to
>increase the
>driver memory and/or executor memory.   
>
>
>On Sat, Sep 3, 2016, 11:42 kant kodali <kanth...@gmail.com> wrote:
>I am running this on aws.
>
> 
>
>
>
>
>
>On Fri, Sep 2, 2016 11:49 PM, kant kodali kanth...@gmail.com
>wrote:
>I am running spark in stand alone mode. I guess this error when I run
>my driver
>program..I am using spark 2.0.0. any idea what this error could be?
>
>
>Using Spark's default log4j profile:
>org/apache/spark/log4j-defaults.properties16/09/02 23:44:44 INFO
>SparkContext: Running Spark version 2.0.016/09/02 23:44:44 WARN
>NativeCodeLoader: Unable to load native-hadoop library for your
>platform... using builtin-java classes where applicable16/09/02
>23:44:45 INFO SecurityManager: Changing view acls to:
>kantkodali16/09/02 23:44:45 INFO SecurityManager: Changing modify acls
>to: kantkodali16/09/02 23:44:45 INFO SecurityManager: Changing view
>acls groups to: 16/09/02 23:44:45 INFO SecurityManager: Changing modify
>acls groups to: 16/09/02 23:44:45 INFO SecurityManager:
>SecurityManager: authentication disabled; ui acls disabled; users  with
>view permissions: Set(kantkodali); groups with view permissions: Set();
>users  with modify permissions: Set(kantkodali); groups with modify
>permissions: Set()16/09/02 23:44:45 INFO Utils: Successfully started
>service 'sparkDriver' on port 62256.16/09/02 23:44:45 INFO SparkEnv:
>Registering MapOutputTracker16/09/02 23:44:45 INFO SparkEnv:
>Registering BlockManagerMaster16/09/02 23:44:45 INFO DiskBlockManager:
>Created local directory at
>/private/var/folders/_6/lfxt933j3bd_xhq0m7dwm8s0gn/T/blockmgr-b56eea49-0102-4570-865a-1d3d230f0ffc16/09/02
>23:44:45 INFO MemoryStore: MemoryStore started with capacity 2004.6
>MB16/09/02 23:44:45 INFO SparkEnv: Registering
>OutputCommitCoordinator16/09/02 23:44:45 INFO Utils: Successfully
>started service 'SparkUI' on port 4040.16/09/02 23:44:45 INFO SparkUI:
>Bound SparkUI to 0.0.0.0, and started at
>http://192.168.0.191:404016/09/02 23:44:45 INFO
>StandaloneAppClient$ClientEndpoint: Connecting to master
>spark://52.43.37.223:7077...16/09/02 23:44:46 INFO
>TransportClientFactory: Successfully created connection to
>/52.43.37.223:7077 after 70 ms (0 ms spent in bootstraps)16/09/02
>23:44:46 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to
>master 52.43.37.223:7077org.apache.spark.SparkException: Exception
>thrown in awaitResultat
>org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>at
>org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>at
>scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>at
>org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>at
>org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)at
>org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)at
>org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88)   
>at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96)at
>org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(Standalone

Re: any idea what this error could be?

2016-09-03 Thread Fridtjof Sander
There is an InvalidClassException complaining about non-matching
serialVersionUIDs. Shouldn't that be caused by different jars on executors
and driver?

Am 03.09.2016 1:04 nachm. schrieb "Tal Grynbaum" :

> My guess is that you're running out of memory somewhere.  Try to increase
> the driver memory and/or executor memory.
>
> On Sat, Sep 3, 2016, 11:42 kant kodali  wrote:
>
>> I am running this on aws.
>>
>>
>>
>> On Fri, Sep 2, 2016 11:49 PM, kant kodali kanth...@gmail.com wrote:
>>
>>> I am running spark in stand alone mode. I guess this error when I run my
>>> driver program..I am using spark 2.0.0. any idea what this error could be?
>>>
>>>
>>>
>>>   Using Spark's default log4j profile: 
>>> org/apache/spark/log4j-defaults.properties
>>> 16/09/02 23:44:44 INFO SparkContext: Running Spark version 2.0.0
>>> 16/09/02 23:44:44 WARN NativeCodeLoader: Unable to load native-hadoop 
>>> library for your platform... using builtin-java classes where applicable
>>> 16/09/02 23:44:45 INFO SecurityManager: Changing view acls to: kantkodali
>>> 16/09/02 23:44:45 INFO SecurityManager: Changing modify acls to: kantkodali
>>> 16/09/02 23:44:45 INFO SecurityManager: Changing view acls groups to:
>>> 16/09/02 23:44:45 INFO SecurityManager: Changing modify acls groups to:
>>> 16/09/02 23:44:45 INFO SecurityManager: SecurityManager: authentication 
>>> disabled; ui acls disabled; users  with view permissions: Set(kantkodali); 
>>> groups with view permissions: Set(); users  with modify permissions: 
>>> Set(kantkodali); groups with modify permissions: Set()
>>> 16/09/02 23:44:45 INFO Utils: Successfully started service 'sparkDriver' on 
>>> port 62256.
>>> 16/09/02 23:44:45 INFO SparkEnv: Registering MapOutputTracker
>>> 16/09/02 23:44:45 INFO SparkEnv: Registering BlockManagerMaster
>>> 16/09/02 23:44:45 INFO DiskBlockManager: Created local directory at 
>>> /private/var/folders/_6/lfxt933j3bd_xhq0m7dwm8s0gn/T/blockmgr-b56eea49-0102-4570-865a-1d3d230f0ffc
>>> 16/09/02 23:44:45 INFO MemoryStore: MemoryStore started with capacity 
>>> 2004.6 MB
>>> 16/09/02 23:44:45 INFO SparkEnv: Registering OutputCommitCoordinator
>>> 16/09/02 23:44:45 INFO Utils: Successfully started service 'SparkUI' on 
>>> port 4040.
>>> 16/09/02 23:44:45 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
>>> http://192.168.0.191:4040
>>> 16/09/02 23:44:45 INFO StandaloneAppClient$ClientEndpoint: Connecting to 
>>> master spark://52.43.37.223:7077...
>>> 16/09/02 23:44:46 INFO TransportClientFactory: Successfully created 
>>> connection to /52.43.37.223:7077 after 70 ms (0 ms spent in bootstraps)
>>> 16/09/02 23:44:46 WARN StandaloneAppClient$ClientEndpoint: Failed to 
>>> connect to master 52.43.37.223:7077
>>> org.apache.spark.SparkException: Exception thrown in awaitResult
>>> at 
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>>> at 
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>>> at 
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>>> at 
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>>> at 
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>>> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>>> at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88)
>>> at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96)
>>> at 
>>> org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:109)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: java.io.InvalidClassException: 
>>> org.apache.spark.rpc.netty.RequestMessage; local class incompatible: stream 
>>> classdesc serialVersionUID = -2221986757032131007, local class 
>>> serialVersionUID = -5447855329526097695
>>> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
>>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
>>>
>>>


Re: Grouping on bucketed and sorted columns

2016-09-02 Thread Fridtjof Sander
I succeeded to do some experimental evaluation, and it seems I correctly 
understood the code:
A partition that consist of hive-buckets is read bucket-file by 
bucket-file, which leads to the loss of internal sorting.


Does anyone have an opinion about my alternative idea of reading from 
multiple bucket-files simultaneously to keep that ordering?


Regarding the followup questions:

1. I found the `collect_list()`function, which seems provide what I 
want. However, I fail to collect more than one column. Is there a way to 
do basically: .agg(collect_list("*")) ?


2. I worked around that problem by writing and reading the table within 
the same context/session, so that the ephemeral metastore doesn't loose 
it's content. However, in general a hive-metastore seems to be required 
for a production usage, since there is only an ephemeral- and a 
hive-catalog implementation available in 2.0.0.


I would highly appreciate some feedback to my thoughts and questions

Am 31.08.2016 um 14:45 schrieb Fridtjof Sander:

Hi Spark users,

I'm currently investigating spark's bucketing and partitioning 
capabilities and I have some questions:


Let /T/ be a table that is bucketed and sorted by /T.id/ and 
partitioned by /T.date/. Before persisting, /T/ has been repartitioned 
by /T.id/ to get only one file per bucket.

I want to group by /T.id/ over a subset of /T.date/'s values.

It seems to me that the best execution plan in this scenario would be 
the following:
- Schedule one stage (no exchange) with as many tasks as we have 
bucket-ids, so that there is a mapping from each task to a bucket-id
- Each tasks opens all bucket-files belonging to "it's" bucket-id 
simultaneously, which is one per affected partition /T.date/
- Since the data inside the buckets are sorted, we can perform the 
second phase of "two-phase-multiway-merge-sort" to get our groups, 
which can be "pipelined" into the next operator


From what I understand after scanning through the code, however, it 
appears to me that each bucket-file is read completely before the 
record-iterator is advanced to the next bucket file (see FileScanRDD , 
same applies to Hive). So a groupBy would require to sort the 
partitions of the resulting RDD before the groups can be emitted, 
which results in a blocking operation.


Could anyone confirm that I'm assessing the situation correctly here, 
or correct me if not?


Followup questions:

1. Is there a way to get the "sql" groups into the RDD API, like the 
RDD groupBy would return them? I fail to formulate a job like this, 
because a query with groupBy, that misses an aggregation function, is 
invalid.
2. I haven't simply testet this, because I fail to load a table with 
the specified properties like above:

After writing a table like this:
.write().partitionBy("date").bucketBy(4,"id").sortBy("id").format("json").saveAsTable("table");
I fail to read it again, with the partitioning and bucketing being 
recognized.
Is a functioning Hive-Metastore required for this to work, or is there 
a workaround?


I hope someone can spare the time to help me out here.

All the best,
Fridtjof





Grouping on bucketed and sorted columns

2016-08-31 Thread Fridtjof Sander

Hi Spark users,

I'm currently investigating spark's bucketing and partitioning 
capabilities and I have some questions:


Let /T/ be a table that is bucketed and sorted by /T.id/ and partitioned 
by /T.date/. Before persisting, /T/ has been repartitioned by /T.id/ to 
get only one file per bucket.

I want to group by /T.id/ over a subset of /T.date/'s values.

It seems to me that the best execution plan in this scenario would be 
the following:
- Schedule one stage (no exchange) with as many tasks as we have 
bucket-ids, so that there is a mapping from each task to a bucket-id
- Each tasks opens all bucket-files belonging to "it's" bucket-id 
simultaneously, which is one per affected partition /T.date/
- Since the data inside the buckets are sorted, we can perform the 
second phase of "two-phase-multiway-merge-sort" to get our groups, which 
can be "pipelined" into the next operator


From what I understand after scanning through the code, however, it 
appears to me that each bucket-file is read completely before the 
record-iterator is advanced to the next bucket file (see FileScanRDD , 
same applies to Hive). So a groupBy would require to sort the partitions 
of the resulting RDD before the groups can be emitted, which results in 
a blocking operation.


Could anyone confirm that I'm assessing the situation correctly here, or 
correct me if not?


Followup questions:

1. Is there a way to get the "sql" groups into the RDD API, like the RDD 
groupBy would return them? I fail to formulate a job like this, because 
a query with groupBy, that misses an aggregation function, is invalid.
2. I haven't simply testet this, because I fail to load a table with the 
specified properties like above:

After writing a table like this:

.write().partitionBy("date").bucketBy(4,"id").sortBy("id").format("json").saveAsTable("table");

I fail to read it again, with the partitioning and bucketing being 
recognized.
Is a functioning Hive-Metastore required for this to work, or is there a 
workaround?


I hope someone can spare the time to help me out here.

All the best,
Fridtjof




Re: Isotonic Regression, run method overloaded Error

2016-07-11 Thread Fridtjof Sander
Spark's implementation does perform PAVA on each partition only to then 
collect each result to the driver and to perform PAVA again on the 
collected results. The hope of that is, that enough data is pooled, so 
that the the last step does not exceed the drivers memory limits. This 
assumption does of course not generally hold. Just consider what 
happens, if the data is already correctly sorted. In that case nothing 
is pooled and model size roughly equals data size. Spark's IR model 
saves boundaries and predictions as double arrays instead, so the 
(unpooled) data has to fit into memory.


https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Am 11.07.2016 um 17:06 schrieb Yanbo Liang:
IsotonicRegression can handle feature column of vector type. It will 
extract the a certain index (controlled by param "featureIndex") of 
this feature vector and feed it into model training. It will perform 
Pool adjacent violators algorithms on each partition, so it's 
distributed and the data is not necessary to fit into memory of a 
single machine.

The following code snippets can work well on my machine:

|val labels = Seq(1, 2, 3, 1, 6, 17, 16, 17, 18) val dataset = 
spark.createDataFrame( labels.zipWithIndex.map { case (label, i) => 
(label, Vectors.dense(Array(i.toDouble, i.toDouble + 1.0)), 1.0) } 
).toDF("label", "features", "weight") val ir = new 
IsotonicRegression().setIsotonic(true) val model = ir.fit(dataset) val 
predictions = model .transform(dataset) .select("prediction").rdd.map 
{ case Row(pred) => pred }.collect() assert(predictions === Array(1, 
2, 2, 2, 6, 16.5, 16.5, 17, 18)) |


Thanks
Yanbo

2016-07-11 6:14 GMT-07:00 Fridtjof Sander 
<fridtjof.san...@googlemail.com <mailto:fridtjof.san...@googlemail.com>>:


Hi Swaroop,

from my understanding, Isotonic Regression is currently limited to
data with 1 feature plus weight and label. Also the entire data is
required to fit into memory of a single machine.
I did some work on the latter issue but discontinued the project,
because I felt no one really needed it. I'd be happy to resume my
work on Spark's IR implementation, but I fear there won't be a
quick for your issue.

Fridtjof


Am 08.07.2016 um 22:38 schrieb dsp:

Hi I am trying to perform Isotonic Regression on a data set
with 9 features
and a label.
When I run the algorithm similar to the way mentioned on MLlib
page, I get
the error saying

/*error:* overloaded method value run with alternatives:
(input: org.apache.spark.api.java.JavaRDD[(java.lang.Double,
java.lang.Double,

java.lang.Double)])org.apache.spark.mllib.regression.IsotonicRegressionModel

   (input: org.apache.spark.rdd.RDD[(scala.Double, scala.Double,
scala.Double)])org.apache.spark.mllib.regression.IsotonicRegressionModel
  cannot be applied to
(org.apache.spark.rdd.RDD[(scala.Double, scala.Double,
scala.Double, scala.Double, scala.Double, scala.Double,
scala.Double,
scala.Double, scala.Double, scala.Double, scala.Double,
scala.Double,
scala.Double)])
  val model = new
IsotonicRegression().setIsotonic(true).run(training)/

For the may given in the sample code, it looks like it can be
done only for
dataset with a single feature because run() method can accept
only three
parameters leaving which already has a label and a default
value leaving
place for only one variable.
So, How can this be done for multiple variables ?

Regards,
Swaroop



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Isotonic-Regression-run-method-overloaded-Error-tp27313.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>






Re: Isotonic Regression, run method overloaded Error

2016-07-11 Thread Fridtjof Sander

Hi Swaroop,

from my understanding, Isotonic Regression is currently limited to data 
with 1 feature plus weight and label. Also the entire data is required 
to fit into memory of a single machine.
I did some work on the latter issue but discontinued the project, 
because I felt no one really needed it. I'd be happy to resume my work 
on Spark's IR implementation, but I fear there won't be a quick for your 
issue.


Fridtjof

Am 08.07.2016 um 22:38 schrieb dsp:

Hi I am trying to perform Isotonic Regression on a data set with 9 features
and a label.
When I run the algorithm similar to the way mentioned on MLlib page, I get
the error saying

/*error:* overloaded method value run with alternatives:
(input: org.apache.spark.api.java.JavaRDD[(java.lang.Double,
java.lang.Double,
java.lang.Double)])org.apache.spark.mllib.regression.IsotonicRegressionModel

   (input: org.apache.spark.rdd.RDD[(scala.Double, scala.Double,
scala.Double)])org.apache.spark.mllib.regression.IsotonicRegressionModel
  cannot be applied to (org.apache.spark.rdd.RDD[(scala.Double, scala.Double,
scala.Double, scala.Double, scala.Double, scala.Double, scala.Double,
scala.Double, scala.Double, scala.Double, scala.Double, scala.Double,
scala.Double)])
  val model = new
IsotonicRegression().setIsotonic(true).run(training)/

For the may given in the sample code, it looks like it can be done only for
dataset with a single feature because run() method can accept only three
parameters leaving which already has a label and a default value leaving
place for only one variable.
So, How can this be done for multiple variables ?

Regards,
Swaroop



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Isotonic-Regression-run-method-overloaded-Error-tp27313.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org