Migration issue upgrading from Spark 2.4.5 to spark 3.0.1

2020-12-10 Thread Nathan Kronenfeld
Hi, everyone.

We are in the process of upgrading our Spark version, and are hitting what
appears to be the bug described in
https://issues.apache.org/jira/browse/SPARK-29497 - namely, that when we
pass anonymous functions to the workers, we get an error

Caused by: java.lang.ClassCastException: cannot assign instance
of java.lang.invoke.SerializedLambda to field 
of type scala.Function1 in instance of 

As we are using Gradle instead of Maven, the suggested fixes there, even as
far as they go, don't work for us.

Has anyone else seen this? Does anyone have any suggestions on how to get
around it, other than to stop passing around anonymous functions?

Thanks,

    - Nathan Kronenfeld
- Uncharted Software


Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"

2019-09-13 Thread Nathan Kronenfeld
It's a bit of a pain, but you could just use an outer join (assuming there
are no duplicates in the input datasets, of course):

import org.apache.spark.sql.test.SharedSparkSession
import org.scalatest.FunSpec

class QuestionSpec extends FunSpec with SharedSparkSession {
  describe("spark list question") {
it("should join based on id with one row only per id, based on the
first dataset") {
  import testImplicits._
  import org.apache.spark.sql.functions.when

  val ds1 = spark.createDataFrame(Seq(
QuestionRecord(0, "dataset 1 record 1"),
QuestionRecord(2, "dataset 1 record 2"),
QuestionRecord(4, "dataset 1 record 3"),
QuestionRecord(6, "dataset 1 record 4"),
QuestionRecord(8, "dataset 1 record 5")
  ))
  val ds2 = spark.createDataFrame(Seq(
QuestionRecord(0, "dataset 2 record 1"),
QuestionRecord(3, "dataset 2 record 2"),
QuestionRecord(6, "dataset 2 record 3"),
QuestionRecord(9, "dataset 2 record 4"),
QuestionRecord(12, "dataset 2 record 5")
  ))

  val allColumns = ds1.columns

  // Merge the datasets
  val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer")

  // Form new columns with the required value
  val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) =>
ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull,
ds1(nextColName)).otherwise(ds2(nextColName)))
  }

  // Drop old columns
  val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) =>
ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName))
  }.drop("id")

  // And get rid of our new_ marker
  val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) =>
  ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName)
  }

  ds6.show()
}
  }
}

case class QuestionRecord (id: Int, payload: String)

On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada 
wrote:

> Hi,
>
> I am trying to take union of 2 dataframes and then drop duplicate based on
> the value of a specific column. But, I want to make sure that while
> dropping duplicates, the rows from first data frame are kept.
>
> Example:
> df1 = df1.union(df2).dropDuplicates(['id'])
>
>
>


Problem upgrading from 2.3.1 to 2.4.3 with gradle

2019-09-09 Thread Nathan Kronenfeld
Hi, Spark community.

We are trying to upgrade our application from spark 2.3.1 to 2.4.3, and
came across a weird problem.

We are using Gradle for dependency management.

Spark depends on twitter-chill, which depends on kryo-shaded. All our
dependencies to kryo-shaded come from twitter, and all request version 4.0.2

Gradle, however, in its infinite wisdom, pulls version 3.0.3 instead, and
that won't even compile.

dependencyInsight returns the following:

com.esotericsoftware:kryo-shaded:3.0.3 (selected by rule)
   variant "runtime" [
  org.gradle.status = release (not requested)
  Requested attributes not found in the selected variant:
 org.gradle.usage  = java-api
   ]

com.esotericsoftware:kryo-shaded:4.0.2 -> 3.0.3
+--- com.twitter:chill-java:0.9.3
|+--- org.apache.spark:spark-core_2.11:2.4.3
||+--- compileClasspath
||+--- org.apache.spark:spark-mllib_2.11:2.4.3
|||\--- compileClasspath
||+--- org.apache.spark:spark-sql_2.11:2.4.3
|||+--- compileClasspath
|||\--- org.apache.spark:spark-mllib_2.11:2.4.3 (*)
||+--- org.apache.spark:spark-catalyst_2.11:2.4.3
|||\--- org.apache.spark:spark-sql_2.11:2.4.3 (*)
||+--- org.apache.spark:spark-streaming_2.11:2.4.3
|||\--- org.apache.spark:spark-mllib_2.11:2.4.3 (*)
||\--- org.apache.spark:spark-graphx_2.11:2.4.3
|| \--- org.apache.spark:spark-mllib_2.11:2.4.3 (*)
|\--- com.twitter:chill_2.11:0.9.3
| +--- org.apache.spark:spark-core_2.11:2.4.3 (*)
| \--- org.apache.spark:spark-unsafe_2.11:2.4.3
|  +--- org.apache.spark:spark-catalyst_2.11:2.4.3 (*)
|  \--- org.apache.spark:spark-core_2.11:2.4.3 (*)
\--- com.twitter:chill_2.11:0.9.3 (*)

I presume this means that Gradle can't find the property/value
org.gradle.usage=java-api in kryo-shaded version 4.0.2, but it can in 3.0.3?

Does anyone know why this might occur? I see no reference to
org.gradle.usage in either our or spark's build files, so (assuming I even
understand the problem correctly) I have no idea where this requirement is
coming from.

We can work around the problem by setting the kryo-shaded version
explicitly, but of course this means we would have to keep setting it as we
upgrade in the future, so of course this is not ideal.

I realize this is likely (though not certainly) a gradle, not a spark,
problem, but I'm hoping someone else here has encountered this before?

Thanks in advance,
        -Nathan Kronenfeld


Re: conflicting version question

2018-10-26 Thread Nathan Kronenfeld
Thanks for the suggestion.

Ouch.  That looks painful.

On Fri, Oct 26, 2018 at 1:28 PM Anastasios Zouzias 
wrote:

> Hi Nathan,
>
> You can try to shade the dependency version that you want to use. That
> said, shading is a tricky technique. Good luck.
>
>
> https://softwareengineering.stackexchange.com/questions/297276/what-is-a-shaded-java-dependency
>
>
> See also elasticsearch's discussion on shading
>
> https://www.elastic.co/de/blog/to-shade-or-not-to-shade
>
> Best,
> Anastasios
>
>
> On Fri, 26 Oct 2018, 15:45 Nathan Kronenfeld,
>  wrote:
>
>> Our code is currently using Gson 2.8.5.  Spark, through Hadoop-API, pulls
>> in Gson 2.2.4.
>>
>> At the moment, we just get "method X not found" exceptions because of
>> this - because when we run in Spark, 2.2.4 is what gets loaded.
>>
>> Is there any way to have both versions exist simultaneously? To load
>> 2.8.5 so that our code uses it, without messing up spark?
>>
>> Thanks,
>>   -Nathan Kronenfeld
>>
>


conflicting version question

2018-10-26 Thread Nathan Kronenfeld
Our code is currently using Gson 2.8.5.  Spark, through Hadoop-API, pulls
in Gson 2.2.4.

At the moment, we just get "method X not found" exceptions because of this
- because when we run in Spark, 2.2.4 is what gets loaded.

Is there any way to have both versions exist simultaneously? To load 2.8.5
so that our code uses it, without messing up spark?

Thanks,
      -Nathan Kronenfeld


Re: RepartitionByKey Behavior

2018-06-22 Thread Nathan Kronenfeld
>
> On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit 
>>> wrote:
>>>
 Hi

  I have been trying to this simple operation.  I want to land all
 values with one key in same partition, and not have any different key in
 the same partition.  Is this possible?   I am getting b and c always
 getting mixed up in the same partition.



I think you could do something approsimately like:

 val keys = rdd.map(_.getKey).distinct.zipWithIndex
 val numKey = keys.map(_._2).count
 rdd.map(r => (r.getKey, r)).join(keys).partitionBy(new Partitioner()
{def numPartitions=numKeys;def getPartition(key: Any) =
key.asInstanceOf[Long].toInt})

i.e., key by a unique number, count that, and repartition by key to the
exact count.  This presumes, of course, that the number of keys is 

Re: Building SparkML vectors from long data

2018-06-12 Thread Nathan Kronenfeld
I don't know if this is the best way or not, but:

val indexer = new StringIndexer().setInputCol("vr").setOutputCol("vrIdx")
val indexModel = indexer.fit(data)
val indexedData = indexModel.transform(data)
val variables = indexModel.labels.length

val toSeq = udf((a: Double, b: Double) => Seq(a, b))
val toVector = udf((seq: Seq[Seq[Double]]) => {
  new SparseVector(variables, seq.map(_(0).toInt).toArray,
seq.map(_(1)).toArray)
})
val result = indexedData
  .withColumn("val", toSeq(col("vrIdx"), col("value")))
  .groupBy("ID")
  .agg(collect_set(col("val")).name("collected_val"))
  .withColumn("collected_val",
toVector(col("collected_val")).as[Row](Encoders.javaSerialization(classOf[Row])))


at least works.  The indices still aren't in order in the vector - I don't
know if this matters much, but if it does, it's easy enough to sort them in
toVector (and to remove duplicates)


On Tue, Jun 12, 2018 at 2:24 PM, Patrick McCarthy 
wrote:

> I work with a lot of data in a long format, cases in which an ID column is
> repeated, followed by a variable and a value column like so:
>
> +---+-+---+
> |ID | var | value |
> +---+-+---+
> | A | v1  | 1.0   |
> | A | v2  | 2.0   |
> | B | v1  | 1.5   |
> | B | v3  | -1.0  |
> +---+-+---+
>
> It seems to me that Spark doesn't provide any clear native way to
> transform data of this format into a Vector() or VectorUDT() type suitable
> for machine learning algorithms.
>
> The best solution I've found so far (which isn't very good) is to group by
> ID, perform a collect_list, and then use a UDF to translate the resulting
> array into a vector datatype.
>
> I can get kind of close like so:
>
> indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')
>
> (indexed_df
> .withColumn('val',F.concat(F.col('varIdx').astype(T.
> IntegerType()).astype(T.StringType()), F.lit(':'),F.col('value')))
> .groupBy('ID')
> .agg(F.collect_set('val'))
> )
>
> But the resultant 'val' vector is out of index order, and still would need
> to be parsed.
>
> What's the current preferred way to solve a problem like this?
>


select with more than 5 typed columns

2018-01-08 Thread Nathan Kronenfeld
Looking in Dataset, there are select functions taking from 1 to 5
TypedColumn arguments.

Is there a built-in way to pull out more than 5 typed columns into a
Dataset (without having to resort to using a DataFrame, or manual
processing of the RDD)?

Thanks,
 - Nathan Kronenfeld
 - Uncharted Software


Re: Apache Spark-Subtract two datasets

2017-10-13 Thread Nathan Kronenfeld
I think you want a join of type "left_anti"... See below log

scala> import spark.implicits._
import spark.implicits._

scala> case class Foo (a: String, b: Int)
defined class Foo

scala> case class Bar (a: String, d: Double)
defined class Bar

scala> var fooDs = Seq(Foo("a", 1), Foo("b", 2), Foo("c", 3)).toDS
fooDs: org.apache.spark.sql.Dataset[Foo] = [a: string, b: int]

scala> var barDs = Seq(Bar("b", 2.1), Bar("c", 3.2), Bar("d", 4.3)).toDS
barDs: org.apache.spark.sql.Dataset[Bar] = [a: string, d: double]

scala> fooDs.join(barDs, Seq("a"), "left_anti").collect.foreach(println)
[a,1]


On Thu, Oct 12, 2017 at 1:16 PM, Shashikant Kulkarni <
shashikant.kulka...@gmail.com> wrote:

> Hello,
>
> I have 2 datasets, Dataset and other is Dataset. I want
> the list of records which are in Dataset but not in
> Dataset. How can I do this in Apache Spark using Java Connector? I
> am using Apache Spark 2.2.0
>
> Thank you
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: isCached

2017-09-01 Thread Nathan Kronenfeld
Thanks for the info

On Fri, Sep 1, 2017 at 12:06 PM, Nick Pentreath 
wrote:

> No unfortunately not - as i recall storageLevel accesses some private
> methods to get the result.
>
> On Fri, 1 Sep 2017 at 17:55, Nathan Kronenfeld 
> 
> wrote:
>
>> Ah, in 2.1.0.
>>
>> I'm in 2.0.1 at the moment... is there any way that works that far back?
>>
>> On Fri, Sep 1, 2017 at 11:46 AM, Nick Pentreath > > wrote:
>>
>>> Dataset does have storageLevel. So you can use isCached = (storageLevel
>>> != StorageLevel.NONE) as a test.
>>>
>>> Arguably isCached could be added to dataset too, shouldn't be a
>>> controversial change.
>>>
>>> On Fri, 1 Sep 2017 at 17:31, Nathan Kronenfeld 
>>> 
>>> wrote:
>>>
>>>> I'm currently porting some of our code from RDDs to Datasets.
>>>>
>>>> With RDDs it's pretty easy to figure out if they are cached or not.
>>>>
>>>> I notice that the catalog has a function for determining this on
>>>> Datasets too, but it's private[sql].  Is there any reason for it not to be
>>>> public?  Is there any way at the moment to determine if a dataset is cached
>>>> or not?
>>>>
>>>> Thanks in advance
>>>>-Nathan Kronenfeld
>>>>
>>>
>>


Re: isCached

2017-09-01 Thread Nathan Kronenfeld
Ah, in 2.1.0.

I'm in 2.0.1 at the moment... is there any way that works that far back?

On Fri, Sep 1, 2017 at 11:46 AM, Nick Pentreath 
wrote:

> Dataset does have storageLevel. So you can use isCached = (storageLevel !=
> StorageLevel.NONE) as a test.
>
> Arguably isCached could be added to dataset too, shouldn't be a
> controversial change.
>
> On Fri, 1 Sep 2017 at 17:31, Nathan Kronenfeld 
> 
> wrote:
>
>> I'm currently porting some of our code from RDDs to Datasets.
>>
>> With RDDs it's pretty easy to figure out if they are cached or not.
>>
>> I notice that the catalog has a function for determining this on Datasets
>> too, but it's private[sql].  Is there any reason for it not to be public?
>> Is there any way at the moment to determine if a dataset is cached or not?
>>
>> Thanks in advance
>>-Nathan Kronenfeld
>>
>


isCached

2017-09-01 Thread Nathan Kronenfeld
I'm currently porting some of our code from RDDs to Datasets.

With RDDs it's pretty easy to figure out if they are cached or not.

I notice that the catalog has a function for determining this on Datasets
too, but it's private[sql].  Is there any reason for it not to be public?
Is there any way at the moment to determine if a dataset is cached or not?

Thanks in advance
       -Nathan Kronenfeld


CSV conversion

2016-10-26 Thread Nathan Kronenfeld
We are finally converting from Spark 1.6 to Spark 2.0, and are finding one
barrier we can't get past.

In the past, we converted CSV RDDs (not files) to DataFrames using
DataBricks SparkCSV library - creating a CsvParser and calling
parser.csvRdd.

The current incarnation of spark-csv seems only to have a CSV file format
exposed, and the only entry points we can find are when reading files.

What is the modern pattern for converting an already-read RDD of CSV lines
into a dataframe?

Thanks,
    Nathan Kronenfeld
Uncharted Software


Graph testing question

2015-12-01 Thread Nathan Kronenfeld
I'm trying to test some graph operations I've written using GraphX.

To make sure I catch all appropriate test cases, I'm trying to specify an
input graph that is partitioned a specific way.

Unfortunately, it seems graphx.Graph repartitions and shuffles any input
node and edge RDD I give it.

Is there a way to construct a graph so that it uses the partitions given
and doesn't shuffle everything around?

Thanks,
   -Nathan Kronenfeld


running spark on yarn

2015-05-21 Thread Nathan Kronenfeld
Hello, folks.

We just recently switched to using Yarn on our cluster (when upgrading to
cloudera 5.4.1)

I'm trying to run a spark job from within a broader application (a web
service running on Jetty), so I can't just start it using spark-submit.

Does anyone know of an instructions page on how to do that under yarn?
I've managed to get it mostly there by including all spark, yarn, hadoop,
and hdfs config files in my SparkConf (somewhat indirectly, and that is a
bit of a short-hand), but while the job shows up now under yarn, and has
its own applications web ui page, it's not showing up under the main spark
page, and it's still missing some things (like it can't find the native
library for snappy compression), so I still think I'm doing something wrong.

Any help or hints would be much appreciated.


Thanks,
-Nathan


Re:

2015-03-25 Thread Nathan Kronenfeld
What would it do with the following dataset?

(A, B)
(A, C)
(B, D)

On Wed, Mar 25, 2015 at 1:02 PM, Himanish Kushary 
wrote:

> Hi,
>
> I have a RDD of pairs of strings like below :
>
> (A,B)
> (B,C)
> (C,D)
> (A,D)
> (E,F)
> (B,F)
>
> I need to transform/filter this into a RDD of pairs that does not repeat a
> string once it has been used once. So something like ,
>
> (A,B)
> (C,D)
> (E,F)
>
> (B,C) is out because B has already ben used in (A,B), (A,D) is out because
> A (and D) has been used etc.
>
> I was thinking of a option of using a shared variable to keep track of
> what has already been used but that may only work for a single partition
> and would not scale for larger dataset.
>
> Is there any other efficient way to accomplish this ?
>
> --
> Thanks & Regards
> Himanish
>


Re: Adding a column to a SchemaRDD

2014-12-15 Thread Nathan Kronenfeld
Perfect, that's exactly what I was looking for.

Thank you!

On Mon, Dec 15, 2014 at 3:32 AM, Yanbo Liang  wrote:
>
> Hi Nathan,
>
> #1
>
> Spark SQL & DSL can satisfy your requirement. You can refer the following
> code snippet:
>
> jdata.select(Star(Node), 'seven.getField("mod"), 'eleven.getField("mod"))
>
> You need to import org.apache.spark.sql.catalyst.analysis.Star in advance.
>
> #2
>
> After you make the transform above, you do not need to make SchemaRDD
> manually.
> Because that jdata.select() return a SchemaRDD and you can operate on it
> directly.
>
> For example, the following code snippet will return a new SchemaRDD with
> longer Row:
>
> val t1 = jdata.select(Star(Node), 'seven.getField("mod") +
> 'eleven.getField("mod")  as 'mod_sum)
>
> You can use t1.printSchema() to print the schema of this SchemaRDD and
> check whether it satisfy your requirements.
>
>
>
> 2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld :
>>
>> (1) I understand about immutability, that's why I said I wanted a new
>> SchemaRDD.
>> (2) I specfically asked for a non-SQL solution that takes a SchemaRDD,
>> and results in a new SchemaRDD with one new function.
>> (3) The DSL stuff is a big clue, but I can't find adequate documentation
>> for it
>>
>> What I'm looking for is something like:
>>
>> import org.apache.spark.sql._
>>
>>
>> val sqlc = new SQLContext(sc)
>> import sqlc._
>>
>>
>> val data = sc.parallelize(0 to 99).map(n =>
>> ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
>>   "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n
>> % 11, n * 11))
>> val jdata = sqlc.jsonRDD(data)
>> jdata.registerTempTable("jdata")
>>
>>
>> val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
>> FROM jdata")
>>
>>
>> This sqlVersion works fine, but if I try to do the same thing with a
>> programatic function, I'm missing a bunch of pieces:
>>
>>- I assume I'd need to start with something like:
>>jdata.select('*, 'seven.mod, 'eleven.mod)
>>and then get and process the last two elements.  The problems are:
>>   - I can't select '* - there seems no way to get the complete row
>>   - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
>>   seems only one deep.
>>- Assuming I could do that, I don't see a way to make the result into
>>a SchemaRDD.  I assume I would have to do something like:
>>   1. take my row and value, and create a new, slightly longer row
>>   2. take my old schema, and create a new schema with one more field
>>   at the end, named and typed appropriately
>>   3. combine the two into a SchemaRDD
>>   I think I see how to do 3, but 1 and 2 elude me.
>>
>> Is there more complete documentation somewhere for the DSL portion?
>> Anyone have a clue about any of the above?
>>
>>
>>
>> On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang 
>> wrote:
>>
>>> RDD is immutable so you can not modify it.
>>> If you want to modify some value or schema in RDD,  using map to
>>> generate a new RDD.
>>> The following code for your reference:
>>>
>>> def add(a:Int,b:Int):Int = {
>>>   a + b
>>> }
>>>
>>> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
>>> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
>>> d2.foreach(println)
>>>
>>>
>>> Otherwise, if your self-defining function is straightforward and you can
>>> represent it by SQL, using Spark SQL or DSL is also a good choice.
>>>
>>> case class Person(id: Int, score: Int, value: Int)
>>>
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>
>>> import sqlContext._
>>>
>>> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
>>> val d2 = d1.select('id, 'score, 'id + 'score)
>>> d2.foreach(println)
>>>
>>>
>>> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld >> >:
>>>
>>>> Hi, there.
>>>>
>>>> I'm trying to understand how to augment data in a SchemaRDD.
>>>>
>>>> I can see how to do it if can express the added values in SQL - just
>>

Re: Adding a column to a SchemaRDD

2014-12-12 Thread Nathan Kronenfeld
(1) I understand about immutability, that's why I said I wanted a new
SchemaRDD.
(2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and
results in a new SchemaRDD with one new function.
(3) The DSL stuff is a big clue, but I can't find adequate documentation
for it

What I'm looking for is something like:

import org.apache.spark.sql._


val sqlc = new SQLContext(sc)
import sqlc._


val data = sc.parallelize(0 to 99).map(n =>
("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
  "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n %
11, n * 11))
val jdata = sqlc.jsonRDD(data)
jdata.registerTempTable("jdata")


val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
FROM jdata")


This sqlVersion works fine, but if I try to do the same thing with a
programatic function, I'm missing a bunch of pieces:

   - I assume I'd need to start with something like:
   jdata.select('*, 'seven.mod, 'eleven.mod)
   and then get and process the last two elements.  The problems are:
  - I can't select '* - there seems no way to get the complete row
  - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
  seems only one deep.
   - Assuming I could do that, I don't see a way to make the result into a
   SchemaRDD.  I assume I would have to do something like:
  1. take my row and value, and create a new, slightly longer row
  2. take my old schema, and create a new schema with one more field at
  the end, named and typed appropriately
  3. combine the two into a SchemaRDD
  I think I see how to do 3, but 1 and 2 elude me.

Is there more complete documentation somewhere for the DSL portion? Anyone
have a clue about any of the above?



On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang  wrote:

> RDD is immutable so you can not modify it.
> If you want to modify some value or schema in RDD,  using map to generate
> a new RDD.
> The following code for your reference:
>
> def add(a:Int,b:Int):Int = {
>   a + b
> }
>
> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
> d2.foreach(println)
>
>
> Otherwise, if your self-defining function is straightforward and you can
> represent it by SQL, using Spark SQL or DSL is also a good choice.
>
> case class Person(id: Int, score: Int, value: Int)
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> import sqlContext._
>
> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
> val d2 = d1.select('id, 'score, 'id + 'score)
> d2.foreach(println)
>
>
> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld :
>
>> Hi, there.
>>
>> I'm trying to understand how to augment data in a SchemaRDD.
>>
>> I can see how to do it if can express the added values in SQL - just run
>> "SELECT *,valueCalculation AS newColumnName FROM table"
>>
>> I've been searching all over for how to do this if my added value is a
>> scala function, with no luck.
>>
>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add
>> a new column, D, calculated using Utility.process(b, c), and I want (of
>> course) to pass in the value B and C from each row, ending up with a new
>> SchemaRDD with columns A, B, C, and D.
>>
>> Is this possible? If so, how?
>>
>> Thanks,
>>-Nathan
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Adding a column to a SchemaRDD

2014-12-11 Thread Nathan Kronenfeld
Hi, there.

I'm trying to understand how to augment data in a SchemaRDD.

I can see how to do it if can express the added values in SQL - just run
"SELECT *,valueCalculation AS newColumnName FROM table"

I've been searching all over for how to do this if my added value is a
scala function, with no luck.

Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
new column, D, calculated using Utility.process(b, c), and I want (of
course) to pass in the value B and C from each row, ending up with a new
SchemaRDD with columns A, B, C, and D.

Is this possible? If so, how?

Thanks,
       -Nathan

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Alternatives to groupByKey

2014-12-03 Thread Nathan Kronenfeld
I think it would depend on the type and amount of information you're
collecting.

If you're just trying to collect small numbers for each window, and don't
have an overwhelming number of windows, you might consider using
accumulators.  Just make one per value per time window, and for each data
point, add it to the accumulators for the time windows in which it
belongs.  We've found this approach a lot faster than anything involving a
shuffle.  This should work fine for stuff like max(), min(), and mean()

If you're collecting enough data that accumulators are impractical, I think
I would try multiple passes.  Cache your data, and for each pass, filter to
that window, and perform all your operations on the filtered RDD.  Because
of the caching, it won't be significantly slower than processing it all at
once - in fact, it will probably be a lot faster, because the shuffles are
shuffling less information.  This is similar to what you're suggesting
about partitioning your rdd, but probably simpler and easier.

That being said, your restriction 3 seems to be in contradiction to the
rest of your request - if your aggregation needs to be able to look at all
the data at once, then that seems contradictory to viewing the data through
an RDD.  Could you explain a bit more what you mean by that?

-Nathan


On Wed, Dec 3, 2014 at 4:26 PM, ameyc  wrote:

> Hi,
>
> So my Spark app needs to run a sliding window through a time series dataset
> (I'm not using Spark streaming). And then run different types on
> aggregations on per window basis. Right now I'm using a groupByKey() which
> gives me Iterables for each window. There are a few concerns I have with
> this approach:
>
> 1. groupByKey() could potentially fail for a key not fitting in the memory.
> 2. I'd like to run aggregations like max(), mean() on each of the groups,
> it'd be nice to have the RDD functionality at this point instead of the
> iterables.
> 3. I can't use reduceByKey() or aggregateByKey() are some of my
> aggregations
> need to have a view of the entire window.
>
> Only other way I could think of is partitioning my RDDs into multiple RDDs
> with each RDD representing a window. Is this a sensible approach? Or is
> there any other way of going about this?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-tp20293.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Another accumulator question

2014-11-21 Thread Nathan Kronenfeld
I"m not sure if it's an exact match, or just very close :-)

I don't think our problem is the workload on the driver, I think it's just
memory - so while the solution proposed there would work, it would also be
sufficient for our purposes, I believe, simply to clear each block as soon
as it's added into the canonical version, and try to do so as soon as
possible - but I could be misunderstanding some of the timing, I'm still
investigating.

Though to combine on the worker before returning, as he suggests, would
probably be even better.

On Fri, Nov 21, 2014 at 6:08 PM, Andrew Ash  wrote:

> Hi Nathan,
>
> It sounds like what you're asking for has already been filed as
> https://issues.apache.org/jira/browse/SPARK-664  Does that ticket match
> what you're proposing?
>
> Andrew
>
> On Fri, Nov 21, 2014 at 12:29 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> We've done this with reduce - that definitely works.
>>
>> I've reworked the logic to use accumulators because, when it works, it's
>> 5-10x faster
>>
>> On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen  wrote:
>>
>>> This sounds more like a use case for reduce? or fold? it sounds like
>>> you're kind of cobbling together the same function on accumulators,
>>> when reduce/fold are simpler and have the behavior you suggest.
>>>
>>> On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
>>>  wrote:
>>> > I think I understand what is going on here, but I was hoping someone
>>> could
>>> > confirm (or explain reality if I don't) what I'm seeing.
>>> >
>>> > We are collecting data using a rather sizable accumulator -
>>> essentially, an
>>> > array of tens of thousands of entries.  All told, about 1.3m of data.
>>> >
>>> > If I understand things correctly, it looks to me like, when our job is
>>> done,
>>> > a copy of this array is retrieved from each individual task, all at
>>> once,
>>> > for combination on the client - which means, with 400 tasks to the
>>> job, each
>>> > collection is using up half a gig of memory on the client.
>>> >
>>> > Is this true?  If so, does anyone know a way to get accumulators to
>>> > accumulate as results collect, rather than all at once at the end, so
>>> we
>>> > only have to hold a few in memory at a time, rather than all 400?
>>> >
>>> > Thanks,
>>> >   -Nathan
>>> >
>>> >
>>> > --
>>> > Nathan Kronenfeld
>>> > Senior Visualization Developer
>>> > Oculus Info Inc
>>> > 2 Berkeley Street, Suite 600,
>>> > Toronto, Ontario M5A 4J5
>>> > Phone:  +1-416-203-3003 x 238
>>> > Email:  nkronenf...@oculusinfo.com
>>>
>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Another accumulator question

2014-11-21 Thread Nathan Kronenfeld
We've done this with reduce - that definitely works.

I've reworked the logic to use accumulators because, when it works, it's
5-10x faster

On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen  wrote:

> This sounds more like a use case for reduce? or fold? it sounds like
> you're kind of cobbling together the same function on accumulators,
> when reduce/fold are simpler and have the behavior you suggest.
>
> On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
>  wrote:
> > I think I understand what is going on here, but I was hoping someone
> could
> > confirm (or explain reality if I don't) what I'm seeing.
> >
> > We are collecting data using a rather sizable accumulator - essentially,
> an
> > array of tens of thousands of entries.  All told, about 1.3m of data.
> >
> > If I understand things correctly, it looks to me like, when our job is
> done,
> > a copy of this array is retrieved from each individual task, all at once,
> > for combination on the client - which means, with 400 tasks to the job,
> each
> > collection is using up half a gig of memory on the client.
> >
> > Is this true?  If so, does anyone know a way to get accumulators to
> > accumulate as results collect, rather than all at once at the end, so we
> > only have to hold a few in memory at a time, rather than all 400?
> >
> > Thanks,
> >   -Nathan
> >
> >
> > --
> > Nathan Kronenfeld
> > Senior Visualization Developer
> > Oculus Info Inc
> > 2 Berkeley Street, Suite 600,
> > Toronto, Ontario M5A 4J5
> > Phone:  +1-416-203-3003 x 238
> > Email:  nkronenf...@oculusinfo.com
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Another accumulator question

2014-11-20 Thread Nathan Kronenfeld
I think I understand what is going on here, but I was hoping someone could
confirm (or explain reality if I don't) what I'm seeing.

We are collecting data using a rather sizable accumulator - essentially, an
array of tens of thousands of entries.  All told, about 1.3m of data.

If I understand things correctly, it looks to me like, when our job is
done, a copy of this array is retrieved from each individual task, all at
once, for combination on the client - which means, with 400 tasks to the
job, each collection is using up half a gig of memory on the client.

Is this true?  If so, does anyone know a way to get accumulators to
accumulate as results collect, rather than all at once at the end, so we
only have to hold a few in memory at a time, rather than all 400?

Thanks,
  -Nathan


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: data locality, task distribution

2014-11-13 Thread Nathan Kronenfeld
I am seeing skewed execution times.  As far as I can tell, they are
attributable to differences in data locality - tasks with locality
PROCESS_LOCAL run fast, NODE_LOCAL, slower, and ANY, slowest.

This seems entirely as it should be - the question is, why the different
locality levels?

I am seeing skewed caching, as I mentioned before - in the case I isolated,
with 4 nodes, they were distributed at about 42%, 31%, 20%, and 6%.
However, the total amount was significantly less than the memory of any
single node, so I don't think they could have overpopulated their cache.  I
am occasionally seeing task failures, but the re-execute themselves, and
work fine the next time.  Yet I'm still seeing incomplete caching (from 65%
cached up to 100%, depending on the run).

I shouldn't have much variance in task time - this is simply a foreach over
the data, adding to an accumulator, and the data is completely randomly
distributed, so should be pretty even overall.

I am seeing GC regressions occasionally - they slow a request from about 2
seconds to about 5 seconds.  They 8 minute slowdown seems to be solely
attributable to the data locality issue, as far as I can tell.  There was
some further confusion though in the times I mentioned - the list I gave
(3.1 min, 2 seconds, ... 8 min) were not different runs with different
cache %s, they were iterations within a single run with 100% caching.

   -Nathan



On Thu, Nov 13, 2014 at 1:45 AM, Aaron Davidson  wrote:

> Spark's scheduling is pretty simple: it will allocate tasks to open cores
> on executors, preferring ones where the data is local. It even performs
> "delay scheduling", which means waiting a bit to see if an executor where
> the data resides locally becomes available.
>
> Are yours tasks seeing very skewed execution times? If some tasks are
> taking a very long time and using all the resources on a node, perhaps the
> other nodes are quickly finishing many tasks, and actually overpopulating
> their caches. If a particular machine were not overpopulating its cache,
> and there are no failures, then you should see 100% cached after the first
> run.
>
> It's also strange that running totally uncached takes 3.1 minutes, but
> running 80-90% cached may take 8 minutes. Does your workload produce
> nondeterministic variance in task times? Was it a single straggler, or many
> tasks, that was keeping the job from finishing? It's not too uncommon to
> see occasional performance regressions while caching due to GC, though 2
> seconds to 8 minutes is a bit extreme.
>
> On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> Sorry, I think I was not clear in what I meant.
>> I didn't mean it went down within a run, with the same instance.
>>
>> I meant I'd run the whole app, and one time, it would cache 100%, and the
>> next run, it might cache only 83%
>>
>> Within a run, it doesn't change.
>>
>> On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson 
>> wrote:
>>
>>> The fact that the caching percentage went down is highly suspicious. It
>>> should generally not decrease unless other cached data took its place, or
>>> if unless executors were dying. Do you know if either of these were the
>>> case?
>>>
>>> On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld <
>>> nkronenf...@oculusinfo.com> wrote:
>>>
>>>> Can anyone point me to a good primer on how spark decides where to send
>>>> what task, how it distributes them, and how it determines data locality?
>>>>
>>>> I'm trying a pretty simple task - it's doing a foreach over cached
>>>> data, accumulating some (relatively complex) values.
>>>>
>>>> So I see several inconsistencies I don't understand:
>>>>
>>>> (1) If I run it a couple times, as separate applications (i.e.,
>>>> reloading, recaching, etc), I will get different %'s cached each time.
>>>> I've got about 5x as much memory as I need overall, so it isn't running
>>>> out.  But one time, 100% of the data will be cached; the next, 83%, the
>>>> next, 92%, etc.
>>>>
>>>> (2) Also, the data is very unevenly distributed. I've got 400
>>>> partitions, and 4 workers (with, I believe, 3x replication), and on my last
>>>> run, my distribution was 165/139/25/71.  Is there any way to get spark to
>>>> distribute the tasks more evenly?
>>>>
>>>> (3) If I run the problem several times in the same execution (to take
>>>> advantage of caching etc.), I get very inconsistent r

Re: data locality, task distribution

2014-11-12 Thread Nathan Kronenfeld
Sorry, I think I was not clear in what I meant.
I didn't mean it went down within a run, with the same instance.

I meant I'd run the whole app, and one time, it would cache 100%, and the
next run, it might cache only 83%

Within a run, it doesn't change.

On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson  wrote:

> The fact that the caching percentage went down is highly suspicious. It
> should generally not decrease unless other cached data took its place, or
> if unless executors were dying. Do you know if either of these were the
> case?
>
> On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> Can anyone point me to a good primer on how spark decides where to send
>> what task, how it distributes them, and how it determines data locality?
>>
>> I'm trying a pretty simple task - it's doing a foreach over cached data,
>> accumulating some (relatively complex) values.
>>
>> So I see several inconsistencies I don't understand:
>>
>> (1) If I run it a couple times, as separate applications (i.e.,
>> reloading, recaching, etc), I will get different %'s cached each time.
>> I've got about 5x as much memory as I need overall, so it isn't running
>> out.  But one time, 100% of the data will be cached; the next, 83%, the
>> next, 92%, etc.
>>
>> (2) Also, the data is very unevenly distributed. I've got 400 partitions,
>> and 4 workers (with, I believe, 3x replication), and on my last run, my
>> distribution was 165/139/25/71.  Is there any way to get spark to
>> distribute the tasks more evenly?
>>
>> (3) If I run the problem several times in the same execution (to take
>> advantage of caching etc.), I get very inconsistent results.  My latest
>> try, I get:
>>
>>- 1st run: 3.1 min
>>- 2nd run: 2 seconds
>>- 3rd run: 8 minutes
>>- 4th run: 2 seconds
>>- 5th run: 2 seconds
>>- 6th run: 6.9 minutes
>>- 7th run: 2 seconds
>>- 8th run: 2 seconds
>>- 9th run: 3.9 minuts
>>- 10th run: 8 seconds
>>
>> I understand the difference for the first run; it was caching that time.
>> Later times, when it manages to work in 2 seconds, it's because all the
>> tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the
>> tasks end up with locality level ANY.  Why would that change when running
>> the exact same task twice in a row on cached data?
>>
>> Any help or pointers that I could get would be much appreciated.
>>
>>
>> Thanks,
>>
>>  -Nathan
>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


data locality, task distribution

2014-11-11 Thread Nathan Kronenfeld
Can anyone point me to a good primer on how spark decides where to send
what task, how it distributes them, and how it determines data locality?

I'm trying a pretty simple task - it's doing a foreach over cached data,
accumulating some (relatively complex) values.

So I see several inconsistencies I don't understand:

(1) If I run it a couple times, as separate applications (i.e., reloading,
recaching, etc), I will get different %'s cached each time.  I've got about
5x as much memory as I need overall, so it isn't running out.  But one
time, 100% of the data will be cached; the next, 83%, the next, 92%, etc.

(2) Also, the data is very unevenly distributed. I've got 400 partitions,
and 4 workers (with, I believe, 3x replication), and on my last run, my
distribution was 165/139/25/71.  Is there any way to get spark to
distribute the tasks more evenly?

(3) If I run the problem several times in the same execution (to take
advantage of caching etc.), I get very inconsistent results.  My latest
try, I get:

   - 1st run: 3.1 min
   - 2nd run: 2 seconds
   - 3rd run: 8 minutes
   - 4th run: 2 seconds
   - 5th run: 2 seconds
   - 6th run: 6.9 minutes
   - 7th run: 2 seconds
   - 8th run: 2 seconds
   - 9th run: 3.9 minuts
   - 10th run: 8 seconds

I understand the difference for the first run; it was caching that time.
Later times, when it manages to work in 2 seconds, it's because all the
tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the
tasks end up with locality level ANY.  Why would that change when running
the exact same task twice in a row on cached data?

Any help or pointers that I could get would be much appreciated.


Thanks,

 -Nathan



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


wierd caching

2014-11-08 Thread Nathan Kronenfeld
RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize in
TachyonSize on Disk   8
 Memory Deserialized
1x Replicated 426 107% 59.7 GB 0.0 B 0.0 BAnyone understand what it means
to have more than 100% of an rdd cached?

Thanks,
-Nathan


Re: rdd caching and use thereof

2014-10-16 Thread Nathan Kronenfeld
Oh, I forgot - I've set the following parameters at the moment (besides the
standard location, memory, and core setup):

spark.logConf  true
spark.shuffle.consolidateFiles true
spark.ui.port  4042
spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec
spark.shuffle.file.buffer.kb   500
spark.speculation  true



On Fri, Oct 17, 2014 at 2:46 AM, Nathan Kronenfeld <
nkronenf...@oculusinfo.com> wrote:

> I'm trying to understand two things about how spark is working.
>
> (1) When I try to cache an rdd that fits well within memory (about 60g
> with about 600g of memory), I get seemingly random levels of caching, from
> around 60% to 100%, given the same tuning parameters.  What governs how
> much of an RDD gets cached when there is enough memory?
>
> (2) Even when cached, when I run some tasks over the data, I get various
> locality states.  Sometimes it works perfectly, with everything
> PROCESS_LOCAL, and sometimes I get 10-20% of the data on locality ANY (and
> the task takes minutes instead of seconds); often this will vary if I run
> the task twice in a row in the same shell.  Is there anything I can do to
> affect this?  I tried caching with replication, but that caused everything
> to run out of memory nearly instantly (with the same 60g data set in 4-600g
> of memory)
>
> Thanks for the help,
>
> -Nathan
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


rdd caching and use thereof

2014-10-16 Thread Nathan Kronenfeld
I'm trying to understand two things about how spark is working.

(1) When I try to cache an rdd that fits well within memory (about 60g with
about 600g of memory), I get seemingly random levels of caching, from
around 60% to 100%, given the same tuning parameters.  What governs how
much of an RDD gets cached when there is enough memory?

(2) Even when cached, when I run some tasks over the data, I get various
locality states.  Sometimes it works perfectly, with everything
PROCESS_LOCAL, and sometimes I get 10-20% of the data on locality ANY (and
the task takes minutes instead of seconds); often this will vary if I run
the task twice in a row in the same shell.  Is there anything I can do to
affect this?  I tried caching with replication, but that caused everything
to run out of memory nearly instantly (with the same 60g data set in 4-600g
of memory)

Thanks for the help,

-Nathan


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Accumulator question

2014-10-03 Thread Nathan Kronenfeld
I notice that accumulators register themselves with a private Accumulators
object.

I don't notice any way to unregister them when one is done.

Am I missing something? If not, is there any plan for how to free up that
memory?

I've a case where we're gathering data from repeated queries using some
relatively sizable accumulators; at the moment, we're creating one per
query, and running out of memory after far too few queries.

I've tried methods that don't involve accumulators; they involve a shuffle
instead, and take 10x as long.

Thanks,
      -Nathan




-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Spark streaming vs. spark usage

2014-07-28 Thread Nathan Kronenfeld
So after months and months, I finally started to try and tackle this, but
my scala ability isn't up to it.

The problem is that, of course, even with the common interface, we don't
want inter-operability between RDDs and DStreams.

I looked into Monads, as per Ashish's suggestion, and I think I understand
their relevance.  But when done processing, one would still have to pull
out the wrapped object, knowing what it was, and I don't see how to do that.

I'm guessing there is a way to do this in scala, but I'm not seeing it.

In detail, the requirement would be having something on the order of:

abstract class DistributedCollection[T] {
def [U] map(fcn: T => U): DistributedCollection[U]
...
}

class RDD extends DistrubutedCollection[T] {
// Note the return type that doesn't quite match the interface
def [U] map(fcn: T => U): RDD[U]
...
}

class DStream extends DistrubutedCollection[T] {
// Note the return type that doesn't quite match the interface
def [U] map(fcn: T => U): DStreamU]
...
}

Can anyone point me at a way to do this?

Thanks,
 -Nathan



On Thu, Dec 19, 2013 at 1:08 AM, Ashish Rangole  wrote:

> I wonder if it will help to have a generic Monad container that wraps
> either RDD or DStream and provides
> map, flatmap, foreach and filter methods.
>
> case class DataMonad[A](data: A) {
> def map[B]( f : A => B ) : DataMonad[B] = {
>DataMonad( f( data ) )
> }
>
> def flatMap[B]( f : A => DataMonad[B] ) : DataMonad[B] = {
>f( data )
> }
>
> def foreach ...
> def withFilter ...
> :
> :
> etc, something like that
> }
>
> On Wed, Dec 18, 2013 at 10:42 PM, Reynold Xin  wrote:
>
>>
>> On Wed, Dec 18, 2013 at 12:17 PM, Nathan Kronenfeld <
>> nkronenf...@oculusinfo.com> wrote:
>>
>>>
>>>
>>> Since many of the functions exist in parallel between the two, I guess I
>>> would expect something like:
>>>
>>> trait BasicRDDFunctions {
>>> def map...
>>> def reduce...
>>> def filter...
>>> def foreach...
>>> }
>>>
>>> class RDD extends  BasicRDDFunctions...
>>> class DStream extends BasicRDDFunctions...
>>>
>>
>> I like this idea. We should discuss more about it on the dev list. It
>> would require refactoring some APIs, but does lead to better unification.
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Have different reduce key than mapper key

2014-07-23 Thread Nathan Kronenfeld
You do them sequentially in code; Spark will take care of combining them in
execution.
so something like:
foo.map(fcn to [K1, V1]).reduceByKey(fcn from (V1, V1) to V1).map(fcn from
(K1, V1) to (K2, V2))


On Wed, Jul 23, 2014 at 11:22 AM, soumick86 
wrote:

> How can I transform the mapper key at the reducer output. The functions I
> have encountered are combineByKey, reduceByKey, etc that work on the values
> and not on the key. For example below, this is what I want to achieve but
> seems like I can only have K1 and not K2:
>
> Mapper->(K1,V1)->Reducer->(K2,V2)
>
> I must be missing something. Is there a class/method available? Also I am
> using the Java API
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Have-different-reduce-key-than-mapper-key-tp10503.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Very wierd behavior

2014-07-22 Thread Nathan Kronenfeld
I was wondering if anyone could provide an explanation for the behavior I'm
seeing.

I have an RDD, call it foo, not too complex, with a maybe 8 level deep DAG
with 2 shuffles, not empty, not even terribly big - small enough that some
partitions could be empty.

When I run foo.first, I get workers disconnecting, and applications die
When I run foo.mapPartitions.saveAsHadoopDataset, it works fine.

Anyone got an explanation for why that might be?

-Thanks, Nathan


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


new error for me

2014-07-21 Thread Nathan Kronenfeld
Does anyone know what this error means:
14/07/21 23:07:22 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
14/07/21 23:07:22 INFO TaskSetManager: Starting task 3.0:0 as TID 1620 on
executor 27: r104u05.oculus.local (PROCESS_LOCAL)
14/07/21 23:07:22 INFO TaskSetManager: Serialized task 3.0:0 as 8620 bytes
in 1 ms
14/07/21 23:07:36 INFO BlockManagerInfo: Added taskresult_1620 in memory on
r104u05.oculus.local:50795 (size: 64.9 MB, free: 18.3 GB)
14/07/21 23:07:36 INFO SendingConnection: Initiating connection to [r104u05.
oculus.local/192.168.0.105:50795]
14/07/21 23:07:57 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@1d86a150
java.nio.channels.CancelledKeyException
at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77)
at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:265)
at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:115)
14/07/21 23:07:57 WARN SendingConnection: Error finishing connection to
r104u05.oculus.local/192.168.0.105:50795
java.net.ConnectException: Connection timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
at
org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318)
at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:202)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on
connection to ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 INFO ConnectionManager: Notifying
org.apache.spark.network.ConnectionManager$MessageStatus@13ad274d
14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on
connection to ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 WARN TaskSetManager: Lost TID 1620 (task 3.0:0)
14/07/21 23:07:57 WARN TaskSetManager: Lost result for TID 1620 on host
r104u05.oculus.local

I've never seen this one before, and now it's coming up consistently.

Thanks,
 -Nathan


Re: persistence state of an RDD

2014-07-15 Thread Nathan Kronenfeld
Thanks


On Tue, Jul 15, 2014 at 10:39 AM, Praveen Seluka  wrote:

> Nathan, you are looking for SparkContext.getRDDStorageInfo which returns
> the information on how much is cached.
>
>
> On Tue, Jul 15, 2014 at 8:01 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> Is there a way of determining programatically the cache state of an RDD?
>> Not its storage level, but how much of it is actually cached?
>>
>> Thanks,
>>     -Nathan
>>
>>
>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


persistence state of an RDD

2014-07-15 Thread Nathan Kronenfeld
Is there a way of determining programatically the cache state of an RDD?
Not its storage level, but how much of it is actually cached?

Thanks,
-Nathan





-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Iteration question

2014-07-11 Thread Nathan Kronenfeld
Hi, folks.

We're having a problem with iteration that I don't understand.

We have the following test code:

org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.WARN)
org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.WARN)

def test (caching: Boolean, points: Int, iterations: Int) {
var coords = sc.parallelize(Array.fill(points)(0.0,
0.0).zipWithIndex.map(_.swap))
if (caching) coords.cache
coords.count

var iteration = 0
val times = new Array[Double](iterations)

do {
val start = System.currentTimeMillis
val thisIteration = iteration
val increments = sc.parallelize(for (i <- 1 to points) yield (math.random,
math.random))
val newcoords = coords.zip(increments).map(p =>
{
if (0 == p._1._1) println("Processing iteration "+thisIteration)
(p._1._1,
 (p._1._2._1 + p._2._1,
  p._1._2._2 + p._2._2))
}
)
if (caching) newcoords.cache
newcoords.count
if (caching) coords.unpersist(false)
coords = newcoords
val end = System.currentTimeMillis

times(iteration) = (end-start)/1000.0
println("Done iteration "+iteration+" in "+times(iteration)+" seconds")
iteration = iteration + 1
} while (iteration < iterations)

for (i <- 0 until iterations) {
println("Iteration "+i+": "+times(i))
}
}

If you run this on a local server with caching on and off, it appears that
the caching does what it is supposed to do - only the latest iteration is
processed each time through the loop.

However, despite this, the time for each iteration still gets slower and
slower.
For example, calling test(true, 5000, 100), I get the following times
(weeding out a few for brevity):
Iteration 0: 0.084
Iteration 10: 0.381
Iteration 20: 0.674
Iteration 30: 0.975
Iteration 40: 1.254
Iteration 50: 1.544
Iteration 60: 1.802
Iteration 70: 2.147
Iteration 80: 2.469
Iteration 90: 2.715
Iteration 99: 2.962

That's a 35x increase between the first and last iteration, when it should
be doing the same thing each time!

Without caching, the nubmers are
Iteration 0: 0.642
Iteration 10: 0.516
Iteration 20: 0.823
Iteration 30: 1.17
Iteration 40: 1.514
Iteration 50: 1.655
Iteration 60: 1.992
Iteration 70: 2.177
Iteration 80: 2.472
Iteration 90: 2.814
Iteration 99: 3.018

slightly slower - but not significantly.

Does anyone know, if the caching is working, why is iteration 100 slower
than iteration 1?  And why is caching making so little difference?


Thanks,
-Nathan Kronenfeld

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Shuffle file consolidation

2014-05-29 Thread Nathan Kronenfeld
Thanks, I missed that.

One thing that's still unclear to me, even looking at that, is - does this
parameter have to be set when starting up the cluster, on each of the
workers, or can it be set by an individual client job?


On Fri, May 23, 2014 at 10:13 AM, Han JU  wrote:

> Hi Nathan,
>
> There's some explanation in the spark configuration section:
>
> ```
> If set to "true", consolidates intermediate files created during a
> shuffle. Creating fewer files can improve filesystem performance for
> shuffles with large numbers of reduce tasks. It is recommended to set this
> to "true" when using ext4 or xfs filesystems. On ext3, this option might
> degrade performance on machines with many (>8) cores due to filesystem
> limitations.
> ```
>
>
> 2014-05-23 16:00 GMT+02:00 Nathan Kronenfeld :
>
> In trying to sort some largish datasets, we came across the
>> spark.shuffle.consolidateFiles property, and I found in the source code
>> that it is set, by default, to false, with a note to default it to true
>> when the feature is stable.
>>
>> Does anyone know what is unstable about this? If we set it true, what
>> problems should we anticipate?
>>
>> Thanks,
>> -Nathan Kronenfeld
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>
>
> --
> *JU Han*
>
> Data Engineer @ Botify.com
>
> +33 061960
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Shuffle file consolidation

2014-05-23 Thread Nathan Kronenfeld
In trying to sort some largish datasets, we came across the
spark.shuffle.consolidateFiles property, and I found in the source code
that it is set, by default, to false, with a note to default it to true
when the feature is stable.

Does anyone know what is unstable about this? If we set it true, what
problems should we anticipate?

Thanks,
-Nathan Kronenfeld


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: reading large XML files

2014-05-20 Thread Nathan Kronenfeld
Thanks, that sounds perfect



On Tue, May 20, 2014 at 1:38 PM, Xiangrui Meng  wrote:

> You can search for XMLInputFormat on Google. There are some
> implementations that allow you to specify the  to split on, e.g.:
>
> https://github.com/lintool/Cloud9/blob/master/src/dist/edu/umd/cloud9/collection/XMLInputFormat.java
>
> On Tue, May 20, 2014 at 10:31 AM, Nathan Kronenfeld
>  wrote:
> > Unfortunately, I don't have a bunch of moderately big xml files; I have
> one,
> > really big file - big enough that reading it into memory as a single
> string
> > is not feasible.
> >
> >
> > On Tue, May 20, 2014 at 1:24 PM, Xiangrui Meng  wrote:
> >>
> >> Try sc.wholeTextFiles(). It reads the entire file into a string
> >> record. -Xiangrui
> >>
> >> On Tue, May 20, 2014 at 8:25 AM, Nathan Kronenfeld
> >>  wrote:
> >> > We are trying to read some large GraphML files to use in spark.
> >> >
> >> > Is there an easy way to read XML-based files like this that accounts
> for
> >> > partition boundaries and the like?
> >> >
> >> >  Thanks,
> >> >  Nathan
> >> >
> >> >
> >> > --
> >> > Nathan Kronenfeld
> >> > Senior Visualization Developer
> >> > Oculus Info Inc
> >> > 2 Berkeley Street, Suite 600,
> >> > Toronto, Ontario M5A 4J5
> >> > Phone:  +1-416-203-3003 x 238
> >> > Email:  nkronenf...@oculusinfo.com
> >
> >
> >
> >
> > --
> > Nathan Kronenfeld
> > Senior Visualization Developer
> > Oculus Info Inc
> > 2 Berkeley Street, Suite 600,
> > Toronto, Ontario M5A 4J5
> > Phone:  +1-416-203-3003 x 238
> > Email:  nkronenf...@oculusinfo.com
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: reading large XML files

2014-05-20 Thread Nathan Kronenfeld
Unfortunately, I don't have a bunch of moderately big xml files; I have
one, really big file - big enough that reading it into memory as a single
string is not feasible.


On Tue, May 20, 2014 at 1:24 PM, Xiangrui Meng  wrote:

> Try sc.wholeTextFiles(). It reads the entire file into a string
> record. -Xiangrui
>
> On Tue, May 20, 2014 at 8:25 AM, Nathan Kronenfeld
>  wrote:
> > We are trying to read some large GraphML files to use in spark.
> >
> > Is there an easy way to read XML-based files like this that accounts for
> > partition boundaries and the like?
> >
> >  Thanks,
> >  Nathan
> >
> >
> > --
> > Nathan Kronenfeld
> > Senior Visualization Developer
> > Oculus Info Inc
> > 2 Berkeley Street, Suite 600,
> > Toronto, Ontario M5A 4J5
> > Phone:  +1-416-203-3003 x 238
> > Email:  nkronenf...@oculusinfo.com
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


reading large XML files

2014-05-20 Thread Nathan Kronenfeld
We are trying to read some large GraphML files to use in spark.

Is there an easy way to read XML-based files like this that accounts for
partition boundaries and the like?

 Thanks,
 Nathan


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-05-16 Thread Nathan Kronenfeld
 Serializing the main object isn't going to help here - it's SparkContext
it's complaining about.

The problem is that the context is, according to the code you sent,
computeDwt has a signature of:
class DWTSample ... {
def computDWT (sc: SparkContext, data: ArrayBuffer[(Int, Double)]):
List[Double]
}

do you need the SparkContext within that function?  That function is
executing out on your workers; they shouldn't be trying to send work
directly to other workers anyway, or using RDDs or other spark contexts,
they should just be working with the data.  If you can eliminate the
SparkContext parameter there, you should be fine.

Also, I don't know how expensive DWTSample is to produce, or if you need a
separate instance of each record; if you need one for each record, as is
indicated by the code you sent, it doesn't actually have to be serializable
- you're creating it out on the worker nodes, not sending it to them from
the client node.
If you don't need a unique instance per record, then you can either use the
serializable nature to just create one, and use that one for each record,
or if you would prefer it not to be serializable, you can create one per
partition and use that one on each record in the partition:
kk = series.mapPartitions(iter => {
  val sampler = new DWTsample()
  iter.map(i => sampler.computeDwt(i._2))
})

(assuming you eliminated the sc parameter, of course)


Hope this helps!


On Mon, May 12, 2014 at 2:27 AM, yh18190  wrote:
>
>> Hi,
>>
>> I am facing above exception when I am trying to apply a method(ComputeDwt)
>> on RDD[(Int,ArrayBuffer[(Int,Double)])] input.
>> I am even using extends Serialization option to serialize objects in
>> spark.Here is the code snippet.
>>
>> Could anyone suggest me what could be the problem and what should be done
>> to
>> overcome this issue.???
>>
>> input:series:RDD[(Int,ArrayBuffer[(Int,Double)])]
>> DWTsample extends Serialization is a class having computeDwt function.
>> sc: sparkContext
>>
>>  val  kk:RDD[(Int,List[Double])]=series.map(t=>(t._1,new
>> DWTsample().computeDwt(sc,t._2)))
>>
>> Error:
>> org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>> org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>> at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>> at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Job-failed-java-io-NotSerializableException-org-apache-spark-SparkContext-tp5585.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
>
> --
> Software Engineer
> Analytics Engineering Team@ Box
> Mountain View, CA
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: process_local vs node_local

2014-04-14 Thread Nathan Kronenfeld
Yes, I am caching; there's definitely garbage collection going on, and I am
pushing the limits of our memory, though I think we still fit; thanks, I'll
take a look at the wait parameter.


On Mon, Apr 14, 2014 at 8:39 PM, Matei Zaharia wrote:

> Spark can actually launch multiple executors on the same node if you
> configure it that way, but if you haven’t done that, this might mean that
> some tasks are reading data from the cache, and some from HDFS. (In the
> HDFS case Spark will only report it as NODE_LOCAL since HDFS isn’t tied to
> a particular executor process). For example, maybe you cached some data but
> not all the partitions of the RDD are in memory. Are you using caching here?
>
> There’s a locality wait setting in Spark (spark.locality.wait) that
> determines how long it will wait to go to the next locality level when it
> can’t launch stuff at its preferred one (e.g. to go from process to node).
> You can try increasing that too, by default it’s only 3000 ms. It might be
> that the whole RDD is cached but garbage collection causes it to give up
> waiting on some nodes and launch stuff on other nodes instead, which might
> be HDFS-local (due to data replication) but not cache-local.
>
> Matei
>
> On Apr 14, 2014, at 8:37 AM, dachuan  wrote:
>
> > I am confused about the process local and node local, too.
> >
> > In my current understanding of Spark, one application typically only has
> one executor in one node. However, node local means your data is in the
> same host, but in a different executor.
> >
> > This further means node local is the same with process local unless one
> node has two executors, which could only happen when one node has two
> Workers.
> >
> > Waiting for further discussion ..
> >
> >
> > On Mon, Apr 14, 2014 at 10:13 AM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
> > I've a fairly large job (5E9 records, ~1600 partitions).wherein on a
> given stage, it looks like for the first half of the tasks, everything runs
> in process_local mode in ~10s/partition.  Then, from halfway through,
> everything starts running in node_local mode, and takes 10x as long or more.
> >
> > I read somewhere that the difference between the two had to do with the
> data being local to the running jvm, or another jvm on the same machine.
>  If that's the case, shouldn't the distribution of the two modes be more
> random?  If not, what exactly is the difference between the two modes?
>  Given how much longer it takes in node_local mode, it seems like the whole
> thing would probably run much faster just by waiting for the right jvm to
> be free.  Is there any way of forcing this?
> >
> >
> > Thanks,
> >   -Nathan
> >
> >
> > --
> > Nathan Kronenfeld
> > Senior Visualization Developer
> > Oculus Info Inc
> > 2 Berkeley Street, Suite 600,
> > Toronto, Ontario M5A 4J5
> > Phone:  +1-416-203-3003 x 238
> > Email:  nkronenf...@oculusinfo.com
> >
> >
> >
> > --
> > Dachuan Huang
> > Cellphone: 614-390-7234
> > 2015 Neil Avenue
> > Ohio State University
> > Columbus, Ohio
> > U.S.A.
> > 43210
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


process_local vs node_local

2014-04-14 Thread Nathan Kronenfeld
I've a fairly large job (5E9 records, ~1600 partitions).wherein on a given
stage, it looks like for the first half of the tasks, everything runs in
process_local mode in ~10s/partition.  Then, from halfway through,
everything starts running in node_local mode, and takes 10x as long or more.

I read somewhere that the difference between the two had to do with the
data being local to the running jvm, or another jvm on the same machine.
 If that's the case, shouldn't the distribution of the two modes be more
random?  If not, what exactly is the difference between the two modes?
 Given how much longer it takes in node_local mode, it seems like the whole
thing would probably run much faster just by waiting for the right jvm to
be free.  Is there any way of forcing this?


Thanks,
  -Nathan


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: spark-streaming

2014-04-02 Thread Nathan Kronenfeld
We were using graph.zeroTime, to figure out which files were relevant to
the DStream.

It seems difficult to us to see how one would make a custom DStream without
access to the graph in general though.

And more egregious, the disparity between the privacy and documentation of
clearMetadata and addMetadata was particularly discouraging.



On Wed, Mar 19, 2014 at 7:09 PM, Tathagata Das
wrote:

> Hey Nathan,
>
> We made that private in order to reduce the visible public API, to have
> greater control in the future. Can you tell me more about the timing
> information that you want to get?
>
> TD
>
>
> On Fri, Mar 14, 2014 at 8:57 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> I'm trying to update some spark streaming code from 0.8.1 to 0.9.0.
>>
>> Among other things, I've found the function clearMetadata, who's comment
>> says:
>>
>> "...Subclasses of DStream may override this to clear their own
>> metadata along with the generated RDDs"
>>
>> yet which is declared private[streaming].
>>
>> How are subclasses expected to override this if it's private? If they
>> aren't, how and when should they now clear any extraneous data they have?
>>
>> Similarly, I now see no way to get the timing information - how is a
>> custom dstream supposed to do this now?
>>
>> Thanks,
>> -Nathan
>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Using an external jar in the driver, in yarn-standalone mode.

2014-03-25 Thread Nathan Kronenfeld
by 'use ... my main program' I presume you mean you have a main function in
a class file you want to use as your entry point.

SPARK_CLASSPATH, ADD_JAR, etc add your jars in on the master and the
workers... but they don't on the client.
For that, you're just using ordinary, everyday java/scala - so it just has
to be on the normal java classpath.

Could that be your issue?

  -Nathan



On Tue, Mar 25, 2014 at 2:18 PM, Sandy Ryza  wrote:

> Hi Julien,
>
> Have you called SparkContext#addJars?
>
> -Sandy
>
>
> On Tue, Mar 25, 2014 at 10:05 AM, Julien Carme wrote:
>
>> Hello,
>>
>> I have been struggling for ages to use an external jar in my spark driver
>> program, in yarn-standalone mode. I just want to use in my main program,
>> outside the calls to spark functions, objects that are defined in another
>> jar.
>>
>> I tried to set SPARK_CLASSPATH, ADD_JAR, I tried to use --addJar in the
>> spark-class arguments, I always end up with a "Class not found exception"
>> when I want to use classes defined in my jar.
>>
>> Any ideas?
>>
>> Thanks a lot,
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Akka error with largish job (works fine for smaller versions)

2014-03-25 Thread Nathan Kronenfeld
After digging deeper, I realized all the workers ran out of memory, giving
an hs_error.log file in /tmp/jvm- with the header:

# Native memory allocation (malloc) failed to allocate 2097152 bytes for
committing reserved memory.
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Check if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
#  Out of Memory Error (os_linux.cpp:2761), pid=31426, tid=139549745604352
#
# JRE version: OpenJDK Runtime Environment (7.0_51-b02) (build
1.7.0_51-mockbuild_2014_01_15_01_3
9-b00)
# Java VM: OpenJDK 64-Bit Server VM (24.45-b08 mixed mode linux-amd64 )



We have 3 workers, each assigned 200G for spark.
The dataset is ~250g

All I'm doing is data.map(r => (getKey(r),
r)).sortByKey().map(_._2).coalesce(n).saveAsTextFile(), where n is the
original number of files in the dataset.

This worked fine under spark 0.8.1, with the same setup; I haven't changed
this code since upgrading to 0.9.0.

I took a look at a workers memory before it ran out using jmap and jhat;
they indicated file handles as the biggest memory user (which I guess makes
sense for a sort) - but the total was nowhere close to 200g, so I find
their output somewhat suspect.



On Tue, Mar 25, 2014 at 6:59 AM, Andrew Ash  wrote:

> Possibly one of your executors is in the middle of a large stop-the-world
> GC and doesn't respond to network traffic during that period?  If you
> shared some information about how each node in your cluster is set up (heap
> size, memory, CPU, etc) that might help with debugging.
>
> Andrew
>
>
> On Mon, Mar 24, 2014 at 9:13 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> What does this error mean:
>>
>> @hadoop-s2.oculus.local:45186]: Error [Association failed with
>> [akka.tcp://spark@hadoop-s2.oculus.local:45186]] [
>> akka.remote.EndpointAssociationException: Association failed with
>> [akka.tcp://spark@hadoop-s2.oculus.local:45186]
>> Caused by:
>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>> Connection refused: hadoop-s2.oculus.loca\
>> l/192.168.0.47:45186
>> ]
>>
>> ?
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: mapPartitions use case

2014-03-24 Thread Nathan Kronenfeld
I've seen two cases most commonly:

The first is when I need to create some processing object to process each
record.  If that object creation is expensive, creating one per record
becomes prohibitive.  So instead, we use mapPartition, and create one per
partition, and use it on each record in the partition.

The other is I've often found it much more efficient, when summarizing
data, to use a mutable form of the summary object, running over each record
in a partition, then reduce those per-partition results, than to create a
summary object per record and reduce that much larger set pf summary
objects.  Again, it saves a lot of object creation.



On Mon, Mar 24, 2014 at 8:57 AM, Jaonary Rabarisoa wrote:

> Dear all,
>
> Sorry for asking such a basic question, but someone can explain when one
> should use mapPartiontions instead of map.
>
> Thanks
>
> Jaonary
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Akka error with largish job (works fine for smaller versions)

2014-03-24 Thread Nathan Kronenfeld
What does this error mean:

@hadoop-s2.oculus.local:45186]: Error [Association failed with
[akka.tcp://spark@hadoop-s2.oculus.local:45186]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://spark@hadoop-s2.oculus.local:45186]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: hadoop-s2.oculus.loca\
l/192.168.0.47:45186
]

?

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


spark-streaming

2014-03-14 Thread Nathan Kronenfeld
I'm trying to update some spark streaming code from 0.8.1 to 0.9.0.

Among other things, I've found the function clearMetadata, who's comment
says:

"...Subclasses of DStream may override this to clear their own metadata
along with the generated RDDs"

yet which is declared private[streaming].

How are subclasses expected to override this if it's private? If they
aren't, how and when should they now clear any extraneous data they have?

Similarly, I now see no way to get the timing information - how is a custom
dstream supposed to do this now?

Thanks,
        -Nathan



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Trying to connect to spark from within a web server

2014-02-28 Thread Nathan Kronenfeld
I do notice that scala 2.9.2 is being included because of net.liftweb.

Also, I don't know if I just missed it before or it wasn't doing this
before and my latest changes get it a little farther, but I'm now seeing
the following in the spark logs:

14/02/28 20:13:29 INFO actor.ActorSystemImpl: RemoteClientStarted
@akka://spark@hadoop-s1.oculus.local:35212
14/02/28 20:13:29 ERROR NettyRemoteTransport(null): dropping message
RegisterApplication(ApplicationDescription(Web Service Spark Instance)) for
non-local recipient akka://sparkMaster@192.168.0.46:7077/user/Master at
akka://sparkMaster@hadoop-s1.oculus.local:7077 local is
akka://sparkMaster@hadoop-s1.oculus.local:7077
14/02/28 20:13:49 ERROR NettyRemoteTransport(null): dropping message
RegisterApplication(ApplicationDescription(Web Service Spark Instance)) for
non-local recipient akka://sparkMaster@192.168.0.46:7077/user/Master at
akka://sparkMaster@hadoop-s1.oculus.local:7077 local is
akka://sparkMaster@hadoop-s1.oculus.local:7077
14/02/28 20:14:09 ERROR NettyRemoteTransport(null): dropping message
RegisterApplication(ApplicationDescription(Web Service Spark Instance)) for
non-local recipient akka://sparkMaster@192.168.0.46:7077/user/Master at
akka://sparkMaster@hadoop-s1.oculus.local:7077 local is
akka://sparkMaster@hadoop-s1.oculus.local:7077
14/02/28 20:14:32 INFO actor.ActorSystemImpl: RemoteClientShutdown
@akka://spark@hadoop-s1.oculus.local:35212



On Sat, Feb 22, 2014 at 1:58 PM, Soumya Simanta wrote:

> Mostly likely all your classes/jars that are required to connect to Spark
> and not being loaded or the incorrect versions are being loaded when you
> start to do this from inside the web container (Tomcat).
>
>
>
>
>
> On Sat, Feb 22, 2014 at 1:51 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> yes, but only when I try to connect from a web service running in Tomcat.
>>
>> When I try to connect using a stand-alone program, using the same
>> parameters, it works fine.
>>
>>
>> On Sat, Feb 22, 2014 at 12:15 PM, Mayur Rustagi 
>> wrote:
>>
>>> So Spark is running on that IP, web ui is loading on that IP showing
>>> workers & when you connect to that IP with javaAPI the cluster appears to
>>> be down to it?
>>>
>>> Mayur Rustagi
>>> Ph: +919632149971
>>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>>> https://twitter.com/mayur_rustagi
>>>
>>>
>>>
>>> On Fri, Feb 21, 2014 at 10:22 PM, Nathan Kronenfeld <
>>> nkronenf...@oculusinfo.com> wrote:
>>>
>>>> Netstat gives exactly the expected IP address (not a 127, but a
>>>> 192...).
>>>> I tried it anyway, though... exactly the same results, but with a
>>>> number instead of a name.
>>>> Oh, and I forgot to mention last time, in case it makes a difference -
>>>> I'm running 0.8.1, not 0.9.0, at least for now
>>>>
>>>>
>>>>
>>>> On Sat, Feb 22, 2014 at 12:50 AM, Mayur Rustagi <
>>>> mayur.rust...@gmail.com> wrote:
>>>>
>>>>> most likely the master is binding to a unique address and you are
>>>>> connecting to some other internal address. Master can bind to random
>>>>> internal address 127.0... or even your machine IP at that time.
>>>>> Easiest is to check
>>>>> netstat -an |grep 7077
>>>>> This will give you which IP to bind to exactly when launching spark
>>>>> context.
>>>>>
>>>>> Mayur Rustagi
>>>>> Ph: +919632149971
>>>>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>>>>> https://twitter.com/mayur_rustagi
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 21, 2014 at 9:36 PM, Nathan Kronenfeld <
>>>>> nkronenf...@oculusinfo.com> wrote:
>>>>>
>>>>>> Can anyone help me here?
>>>>>>
>>>>>> I've got a small spark cluster running on three machines - hadoop-s1,
>>>>>> hadoop-s2, and hadoop-s3 - with s1 acting master, and all three acting as
>>>>>> workers.  It works fine - I can connect with spark-shell, I can run 
>>>>>> jobs, I
>>>>>> can see the web ui.
>>>>>>
>>>>>> The web UI says:
>>>>>> Spark Master at spark://hadoop-s1.oculus.local:7077
>>>>>> URL: spark://hadoop-s1.oculus.local:7077
>>>>>>
>>>>>> I've connected to it fine usin

spark failure

2014-02-24 Thread Nathan Kronenfeld
I'm using spark 0.8.1, and trying to run a job from a new remote client (it
works fine when run directly from the master).

When I try and run it, the job just fails without doing anything.

Unfortunately, I also can't find anywhere were it tells me why it fails.
 I'll add the bits of the logs below, but there really isn't much.

Does anyone know how to tell why it's failing? I assume it must be getting
an exception somewhere, but it isn't telling me about it.

On the client, I see:
14/02/24 23:44:43 INFO Client$ClientActor: Executor added:
app-20140224234441-0003/4 on
worker-20140224140443-hadoop-s2.oculus.local-40819
(hadoop-s2.oculus.local:7077) with 32 cores
14/02/24 23:44:43 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140224234441-0003/4 on hostPort hadoop-s2.oculus.local:7077 with 32
cores, 200.0 GB RAM
14/02/24 23:44:43 INFO Client$ClientActor: Executor updated:
app-20140224234441-0003/4 is now RUNNING
14/02/24 23:44:43 INFO FileInputFormat: Total input paths to process : 200
14/02/24 23:44:43 INFO Client$ClientActor: Executor updated:
app-20140224234441-0003/1 is now FAILED (Command exited with code 1)
14/02/24 23:44:43 INFO SparkDeploySchedulerBackend: Executor
app-20140224234441-0003/1 removed: Command exited with code 1

The master log just has:
14/02/24 23:44:43 INFO master.Master: Launching executor
app-20140224234441-0003/4 on worker
worker-20140224140443-hadoop-s2.oculus.local-40819
14/02/24 23:44:45 INFO master.Master: Removing executor
app-20140224234441-0003/4 because it is FAILED

(no other mention of 0003/4)

The client log has:
14/02/24 23:44:43 INFO worker.Worker: Asked to launch executor
app-20140224234441-0003/4 for Pyramid Binning(ndk)
14/02/24 23:44:43 INFO worker.ExecutorRunner: Launch command:
"/usr/java/jdk1.7.0_25-cloudera/bin/java" "-cp"
"math-utilities-0.2.jar:binning-utilities-0.2.jar:tile-generation-0.2.jar:hbase-client-0.95.2-cdh5.0.0-beta-1.jar:hbase-protocol-0.95.2-cdh5.0.0-beta-1.jar:hbase-common-0.95.2-cdh5.0.0-beta-1.jar:htrace-core-2.01.jar:avro-1.7.4.jar:commons-compress-1.4.1.jar:scala-library-2.9.3.jar:scala-compiler-2.9.3.jar:/opt/spark/conf:spark-assembly-0.8.1-incubating-hadoop2.2.0-mr1-cdh5.0.0-beta-1.jar"
"-Dspark.executor.memory=200G" "-Xms204800M" "-Xmx204800M"
"org.apache.spark.executor.CoarseGrainedExecutorBackend"
"akka://spark@hadoop-client.oculus.local:41101/user/CoarseGrainedScheduler"
"4" "hadoop-s2.oculus.local" "32" "app-20140224234441-0003"
14/02/24 23:44:45 INFO worker.Worker: Executor app-20140224234441-0003/4
finished with state FAILED message Command exited with code 1 exitStatus 1


Again, nothing else

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com