Re: Secondar Sorting in spark using clojure/flambo
Should not be `package` in `:import` be the actual package name of ` RFMCPartitioner` ? see examples at https://clojuredocs.org/clojure.core/import like : (ns foo.bar (:import (java.util Date Calendar) (java.util.logging Logger Level))) (ns xyz (:import [** RFMCPartitioner] [** RFMCKey] ) ) where ** is package full name. On Friday, 8 July 2016 21:31:27 UTC+5:30, Punit Naik wrote: > > > > > I have a scala program in which I have implemented a secondary sort which > works perfectly. The way I have written that program is: > > object rfmc { > // Custom Key and partitioner > > case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double) > class RFMCPartitioner(partitions: Int) extends Partitioner { > require(partitions >= 0, "Number of partitions ($partitions) cannot be > negative.") > override def numPartitions: Int = partitions > override def getPartition(key: Any): Int = { > val k = key.asInstanceOf[RFMCKey] > k.cId.hashCode() % numPartitions > } > } > object RFMCKey { > implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = { > Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1)) > } > } > // The body of the code > // > // > val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c) > val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))} > > I wanted to implement the same thing using clojure's DSL for spark called > flambo. Since I can't write partitioner using clojure, I re-used the code > defind above, compiled it and used it as a dependency in my Clojure code. > > Now I am importing the partitioner and the key in my clojure code the > following way: > > (ns xyz > (:import > [package RFMCPartitioner] > [package RFMCKey] > ) > ) > > But when I try to create RFMCKey by doing (RFMCKey. cust_id r f m c), it > throws the following error: > > java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to > java.lang.Comparable > at > org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28) > at > scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153) > at > org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170) > at > org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164) > at > org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252) > at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110) > at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) > at > org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83) > at > org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687) > at > org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > My guess is that its not able to find the ordering that I have defined > after the partitioner. But if it works in Scala, why doesn't it work in > Clojure? > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Rationale for `keys` not supporting vectors?
I've looked around, but couldn't find any discussion on the topic. Is it purely an implementation thing, or a design choice? (Yes, I realize you can just do (range (count v)).) -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
[ANN] lein-monolith
Hi all! I'd like to announce the public availability of a build tool we've been using at Amperity to work with multiple Leiningen projects inside a single repository. There have been many presentations about monorepos by companies like Facebook and Google, but to date the tools for working with Clojure projects in a monorepo have been lacking or poorly advertised. lein-monolith provides a set of tasks to simplify development inside a monorepo, as well as apply tasks to the entire set of subproject sources and tests at once. For an introduction to the project and some of the motivations behind it, see the following Seajure (Seattle Clojure meetup) presentation I gave this month: https://docs.google.com/presentation/d/1jqYG2N2YalWdVG4oDqs1mua4hOyxVD_nejANrg6h8to/present You can find the source code at: https://github.com/amperity/lein-monolith Let me know if you find any errors while using it! We'll continue to develop and support the plugin. Greg Look -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Secondar Sorting in spark using clojure/flambo
I have a scala program in which I have implemented a secondary sort which works perfectly. The way I have written that program is: object rfmc { // Custom Key and partitioner case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double) class RFMCPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.") override def numPartitions: Int = partitions override def getPartition(key: Any): Int = { val k = key.asInstanceOf[RFMCKey] k.cId.hashCode() % numPartitions } } object RFMCKey { implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = { Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1)) } } // The body of the code // // val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c) val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))} I wanted to implement the same thing using clojure's DSL for spark called flambo. Since I can't write partitioner using clojure, I re-used the code defind above, compiled it and used it as a dependency in my Clojure code. Now I am importing the partitioner and the key in my clojure code the following way: (ns xyz (:import [package RFMCPartitioner] [package RFMCKey] ) ) But when I try to create RFMCKey by doing (RFMCKey. cust_id r f m c), it throws the following error: java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to java.lang.Comparable at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28) at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153) at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170) at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164) at org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252) at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110) at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) at org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83) at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687) at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) My guess is that its not able to find the ordering that I have defined after the partitioner. But if it works in Scala, why doesn't it work in Clojure? -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.