Re: Secondar Sorting in spark using clojure/flambo

2016-07-08 Thread Ashish Negi
Should not be `package` in `:import` be the actual package name of  `
RFMCPartitioner` ?

see examples at https://clojuredocs.org/clojure.core/import

like :

(ns foo.bar
  (:import (java.util Date
  Calendar)
   (java.util.logging Logger
  Level)))



(ns xyz
  (:import
[**  RFMCPartitioner]
[** RFMCKey]
)
  )


where ** is package full name.



On Friday, 8 July 2016 21:31:27 UTC+5:30, Punit Naik wrote:
>
>
>  
>
> I have a scala program in which I have implemented a secondary sort which 
> works perfectly. The way I have written that program is:
>
> object rfmc {
>   // Custom Key and partitioner
>
>   case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
>   class RFMCPartitioner(partitions: Int) extends Partitioner {
> require(partitions >= 0, "Number of partitions ($partitions) cannot be 
> negative.")
> override def numPartitions: Int = partitions
> override def getPartition(key: Any): Int = {
>   val k = key.asInstanceOf[RFMCKey]
>   k.cId.hashCode() % numPartitions
> }
>   }
>   object RFMCKey {
> implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
>   Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
> }
>   }
>   // The body of the code
>   //
>   //
>   val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c)
>   val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))}
>
> I wanted to implement the same thing using clojure's DSL for spark called 
> flambo. Since I can't write partitioner using clojure, I re-used the code 
> defind above, compiled it and used it as a dependency in my Clojure code.
>
> Now I am importing the partitioner and the key in my clojure code the 
> following way:
>
> (ns xyz
>   (:import
> [package RFMCPartitioner]
> [package RFMCKey]
> )
>   )
>
> But when I try to create RFMCKey by doing (RFMCKey. cust_id r f m c), it 
> throws the following error:
>
> java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to 
> java.lang.Comparable
> at 
> org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
> at 
> scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
> at 
> org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170)
> at 
> org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164)
> at 
> org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252)
> at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110)
> at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
> at 
> org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83)
> at 
> org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687)
> at 
> org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705)
> at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> My guess is that its not able to find the ordering that I have defined 
> after the partitioner. But if it works in Scala, why doesn't it work in 
> Clojure?
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Rationale for `keys` not supporting vectors?

2016-07-08 Thread Michael Gardner
I've looked around, but couldn't find any discussion on the topic. Is it purely 
an implementation thing, or a design choice?

(Yes, I realize you can just do (range (count v)).)

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


[ANN] lein-monolith

2016-07-08 Thread Gregory Look
Hi all! I'd like to announce the public availability of a build tool we've 
been using at Amperity to work with multiple Leiningen projects inside a 
single repository. There have been many presentations about monorepos by 
companies like Facebook and Google, but to date the tools for working with 
Clojure projects in a monorepo have been lacking or poorly advertised.

lein-monolith provides a set of tasks to simplify development inside a 
monorepo, as well as apply tasks to the entire set of subproject sources 
and tests at once. For an introduction to the project and some of the 
motivations behind it, see the following Seajure (Seattle Clojure meetup) 
presentation I gave this month:

https://docs.google.com/presentation/d/1jqYG2N2YalWdVG4oDqs1mua4hOyxVD_nejANrg6h8to/present

You can find the source code at:

https://github.com/amperity/lein-monolith

Let me know if you find any errors while using it! We'll continue to 
develop and support the plugin.

Greg Look


-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Secondar Sorting in spark using clojure/flambo

2016-07-08 Thread Punit Naik
 
 

I have a scala program in which I have implemented a secondary sort which 
works perfectly. The way I have written that program is:

object rfmc {
  // Custom Key and partitioner

  case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
  class RFMCPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, "Number of partitions ($partitions) cannot be 
negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
  val k = key.asInstanceOf[RFMCKey]
  k.cId.hashCode() % numPartitions
}
  }
  object RFMCKey {
implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
  Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
}
  }
  // The body of the code
  //
  //
  val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c)
  val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))}

I wanted to implement the same thing using clojure's DSL for spark called 
flambo. Since I can't write partitioner using clojure, I re-used the code 
defind above, compiled it and used it as a dependency in my Clojure code.

Now I am importing the partitioner and the key in my clojure code the 
following way:

(ns xyz
  (:import
[package RFMCPartitioner]
[package RFMCKey]
)
  )

But when I try to create RFMCKey by doing (RFMCKey. cust_id r f m c), it 
throws the following error:

java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to 
java.lang.Comparable
at 
org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
at 
scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
at 
org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170)
at 
org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164)
at 
org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at 
org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83)
at 
org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687)
at 
org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

My guess is that its not able to find the ordering that I have defined 
after the partitioner. But if it works in Scala, why doesn't it work in 
Clojure?

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.