Fwd: Spark standalone workers, executors and JVMs

2016-05-02 Thread Simone Franzini
I am still a little bit confused about workers, executors and JVMs in
standalone mode.
Are worker processes and executors independent JVMs or do executors run
within the worker JVM?
I have some memory-rich nodes (192GB) and I would like to avoid deploying
massive JVMs due to well known performance issues (GC and such).
As of Spark 1.4 it is possible to either deploy multiple workers
(SPARK_WORKER_INSTANCES + SPARK_WORKER_CORES) or multiple executors per
worker (--executor-cores). Which option is preferable and why?

Thanks,
Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Re: Spark standalone workers, executors and JVMs

2016-05-04 Thread Simone Franzini
Hi Mohammed,

Thanks for your reply. I agree with you, however a single application can
use multiple executors as well, so I am still not clear which option is
best. Let me make an example to be a little more concrete.

Let's say I am only running a single application. Let's assume again that I
have 192GB of memory and 24 cores on each node. Which one of the following
two options is best and why:
1. Running 6 workers with 32GB each and 1 executor/worker (i.e. set
SPARK_WORKER_INSTANCES=6, leave spark.executor.cores to its default, which
is to assign all available cores to an executor in standalone mode).
2. Running 1 worker with 192GB memory and 6 executors/worker (i.e.
SPARK_WORKER_INSTANCES=1 and spark.executor.cores=5,
spark.executor.memory=32GB).

Also one more question. I understand that workers and executors are
different processes. How many resources is the worker process actually
using and how do I set those? As far as I understand the worker does not
need many resources, as it is only spawning up executors. Is that correct?

Thanks,
Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Mon, May 2, 2016 at 7:47 PM, Mohammed Guller 
wrote:

> The workers and executors run as separate JVM processes in the standalone
> mode.
>
>
>
> The use of multiple workers on a single machine depends on how you will be
> using the clusters. If you run multiple Spark applications simultaneously,
> each application gets its own its executor. So, for example, if you
> allocate 8GB to each application, you can run 192/8 Spark applications
> simultaneously (assuming you also have large number of cores). Each
> executor has only 8GB heap, so GC should not be issue. Alternatively, if
> you know that you will have few applications running simultaneously on that
> cluster, running multiple workers on each machine will allow you to avoid
> GC issues associated with allocating large heap to a single JVM process.
> This option allows you to run multiple executors for an application on a
> single machine and each executor can be configured with optimal memory.
>
>
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Simone Franzini [mailto:captainfr...@gmail.com]
> *Sent:* Monday, May 2, 2016 9:27 AM
> *To:* user
> *Subject:* Fwd: Spark standalone workers, executors and JVMs
>
>
>
> I am still a little bit confused about workers, executors and JVMs in
> standalone mode.
>
> Are worker processes and executors independent JVMs or do executors run
> within the worker JVM?
>
> I have some memory-rich nodes (192GB) and I would like to avoid deploying
> massive JVMs due to well known performance issues (GC and such).
>
> As of Spark 1.4 it is possible to either deploy multiple workers
> (SPARK_WORKER_INSTANCES + SPARK_WORKER_CORES) or multiple executors per
> worker (--executor-cores). Which option is preferable and why?
>
>
>
> Thanks,
>
> Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>
>
>


Spark on DSE Cassandra with multiple data centers

2016-05-11 Thread Simone Franzini
I am running Spark on DSE Cassandra with multiple analytics data centers.
It is my understanding that with this setup you should have a CFS file
system for each data center. I was able to create an additional CFS file
system as described here:
http://docs.datastax.com/en/latest-dse/datastax_enterprise/ana/anaCFS.html
I verified that the additional CFS file system is created properly.

I am now following the instructions here to configure Spark on the second
data center to use its own CFS:
http://docs.datastax.com/en/latest-dse/datastax_enterprise/spark/sparkConfHistoryServer.html
However, running:
dse hadoop fs -mkdir :/spark/events
fails with:
WARN You are going to access CFS keyspace: cfs in data center:
. It will not work because the replication
factor for this keyspace in this data center is 0.

Bad connection to FS. command aborted. exception: UnavailableException()

That is, it appears that the  in the hadoop command is
being ignored and it is trying to connect to cfs: rather than
additional_cfs.

Anybody else ran into this?


Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Number of sortBy output partitions

2016-07-21 Thread Simone Franzini
Hi all,

I am really struggling with the behavior of sortBy. I am running sortBy on
a fairly large dataset (~20GB), that I partitioned in 1200 tasks. The
output of the sortBy stage in the Spark UI shows that it ran with 1200
tasks.

However, when I run the next operation (e.g. filter or saveToTextFile) I
find myself with only 7 partitions. The problem with this is that those
partitions are extremely skewed with 99.99% of the data being in a 12GB
partitions and everything else being in tiny partitions.

It appears (by writing to file) that the data is partitioned according to
the value that I used to sort on (as expected). The problem is that 99.99%
of the data has the same value and therefore ends up in the same partition.

I tried changing the number of tasks in the sortBy as well as a repartition
after the sortBy but to no avail. Is there any way of changing this
behavior? I fear not as this is probably due to the way that sortBy is
implemented, but I thought I would ask anyway.

Should it matter, I am running Spark 1.4.2 (DataStax Enterprise).

Thanks,

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


AVRO specific records

2014-11-05 Thread Simone Franzini
How can I read/write AVRO specific records?
I found several snippets using generic records, but nothing with specific
records so far.

Thanks,
Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Re: AVRO specific records

2014-11-06 Thread Simone Franzini
Benjamin,

Thanks for the snippet. I have tried using it, but unfortunately I get the
following exception. I am clueless at what might be wrong. Any ideas?

java.lang.IncompatibleClassChangeError: Found interface
org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
at
org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:115)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:103)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Wed, Nov 5, 2014 at 4:24 PM, Laird, Benjamin <
benjamin.la...@capitalone.com> wrote:

> Something like this works and is how I create an RDD of specific records.
>
> val avroRdd = sc.newAPIHadoopFile("twitter.avro",
> classOf[AvroKeyInputFormat[twitter_schema]],
> classOf[AvroKey[twitter_schema]], classOf[NullWritable], conf) (From
> https://github.com/julianpeeters/avro-scala-macro-annotation-examples/blob/master/spark/src/main/scala/AvroSparkScala.scala)
> Keep in mind you'll need to use the kryo serializer as well.
>
> From: Frank Austin Nothaft 
> Date: Wednesday, November 5, 2014 at 5:06 PM
> To: Simone Franzini 
> Cc: "user@spark.apache.org" 
> Subject: Re: AVRO specific records
>
> Hi Simone,
>
> Matt Massie put together a good tutorial on his blog
> <http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/>. If you’re
> looking for more code using Avro, we use it pretty extensively in our
> genomics project. Our Avro schemas are here
> <https://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl>,
> and we have serialization code here
> <https://github.com/bigdatagenomics/adam/tree/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization>.
> We use Parquet for storing the Avro records, but there is also an Avro
> HadoopInputFormat.
>
> Regards,
>
> Frank Austin Nothaft
> fnoth...@berkeley.edu
> fnoth...@eecs.berkeley.edu
> 202-340-0466
>
> On Nov 5, 2014, at 1:25 PM, Simone Franzini 
> wrote:
>
> How can I read/write AVRO specific records?
> I found several snippets using generic records, but nothing with specific
> records so far.
>
> Thanks,
> Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed.  If the reader of this message is not the
> intended recipient, you are hereby notified that any review,
> retransmission, dissemination, distribution, copying or other use of, or
> taking of any action in reliance upon this information is strictly
> prohibited. If you have received this communication in error, please
> contact the sender and delete the material from your computer.
>


Re: AVRO specific records

2014-11-07 Thread Simone Franzini
Ok, that turned out to be a dependency issue with Hadoop1 vs. Hadoop2 that
I have not fully solved yet. I am able to run with Hadoop1 and AVRO in
standalone mode but not with Hadoop2 (even after trying to fix the
dependencies).

Anyway, I am now trying to write to AVRO, using a very similar snippet to
the one to read from AVRO:

val withValues : RDD[(AvroKey[Subscriber], NullWritable)] = records.map{s
=> (new AvroKey(s), NullWritable.get)}
val outPath = "myOutputPath"
val writeJob = new Job()
FileOutputFormat.setOutputPath(writeJob, new Path(outPath))
AvroJob.setOutputKeySchema(writeJob, Subscriber.getClassSchema())
writeJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[Any]])
records.saveAsNewAPIHadoopFile(outPath,
classOf[AvroKey[Subscriber]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[Subscriber]],
writeJob.getConfiguration)

Now, my problem is that this writes to a plain text file. I need to write
to binary AVRO. What am I missing?

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Thu, Nov 6, 2014 at 3:15 PM, Simone Franzini 
wrote:

> Benjamin,
>
> Thanks for the snippet. I have tried using it, but unfortunately I get the
> following exception. I am clueless at what might be wrong. Any ideas?
>
> java.lang.IncompatibleClassChangeError: Found interface
> org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
> at
> org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
> at org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:115)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:103)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>
> On Wed, Nov 5, 2014 at 4:24 PM, Laird, Benjamin <
> benjamin.la...@capitalone.com> wrote:
>
>> Something like this works and is how I create an RDD of specific records.
>>
>> val avroRdd = sc.newAPIHadoopFile("twitter.avro",
>> classOf[AvroKeyInputFormat[twitter_schema]],
>> classOf[AvroKey[twitter_schema]], classOf[NullWritable], conf) (From
>> https://github.com/julianpeeters/avro-scala-macro-annotation-examples/blob/master/spark/src/main/scala/AvroSparkScala.scala)
>> Keep in mind you'll need to use the kryo serializer as well.
>>
>> From: Frank Austin Nothaft 
>> Date: Wednesday, November 5, 2014 at 5:06 PM
>> To: Simone Franzini 
>> Cc: "user@spark.apache.org" 
>> Subject: Re: AVRO specific records
>>
>> Hi Simone,
>>
>> Matt Massie put together a good tutorial on his blog
>> <http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/>. If you’re
>> looking for more code using Avro, we use it pretty extensively in our
>> genomics project. Our Avro schemas are here
>> <https://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl>,
>> and we have serialization code here
>> <https://github.com/bigdatagenomics/adam/tree/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization>.
>> We use Parquet for storing the Avro records, but there is also an Avro
>> HadoopInputFormat.
>>
>> Regards,
>>
>> Frank Austin Nothaft
>> fnoth...@berkeley.edu
>> fnoth...@eecs.berkeley.edu
>> 202-340-0466
>>
>> On Nov 5, 2014, at 1:25 PM, Simone Franzini 
>> wrote:
>>
>> How can I read/write AVRO specific records?
>> I found several snippets using generic records, but nothing with specific
>> records so far.
>>
>> Thanks,
>> Simone Franzini, PhD
>>
>> http://www.linkedin.com/in/simonefranzini
>>
>>
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed.  If the reader of this message is not the
>> intended recipient, you are hereby notified that any review,
>> retransmission, dissemination, distribution, copying or other use of, or
>> taking of any action in reliance upon this information is strictly
>> prohibited. If you have received this communication in error, please
>> contact the sender and delete the material from your computer.
>>
>
>


Accessing RDD within another RDD map

2014-11-13 Thread Simone Franzini
The following code fails with NullPointerException in RDD class on the
count function:

val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(11 to 20)
rdd1.map{ i =>
 rdd2.count
}
.foreach(println(_))

The same goes for any other action I am trying to perform inside the map
statement. I am failing to understand what I am doing wrong.
Can anyone help with this?

Thanks,
Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Declaring multiple RDDs and efficiency concerns

2014-11-14 Thread Simone Franzini
Let's say I have to apply a complex sequence of operations to a certain RDD.
In order to make code more modular/readable, I would typically have
something like this:

object myObject {
  def main(args: Array[String]) {
val rdd1 = function1(myRdd)
val rdd2 = function2(rdd1)
val rdd3 = function3(rdd2)
  }

  def function1(rdd: RDD) : RDD = { doSomething }
  def function2(rdd: RDD) : RDD = { doSomethingElse }
  def function3(rdd: RDD) : RDD = { doSomethingElseYet }
}

So I am explicitly declaring vals for the intermediate steps. Does this end
up using more storage than if I just chained all of the operations and
declared only one val instead?
If yes, is there a better way to chain together the operations?
Ideally I would like to do something like:

val rdd = function1.function2.function3

Is there a way I can write the signature of my functions to accomplish
this? Is this also an efficiency issue or just a stylistic one?

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Reading nested JSON data with Spark SQL

2014-11-19 Thread Simone Franzini
I have been using Spark SQL to read in JSON data, like so:
val myJsonFile = sqc.jsonFile(args("myLocation"))
myJsonFile.registerTempTable("myTable")
sqc.sql("mySQLQuery").map { row =>
myFunction(row)
}

And then in myFunction(row) I can read the various columns with the
Row.getX methods. However, this methods only work for basic types (string,
int, ...).
I was having some trouble reading columns that are arrays or maps (i.e.
other JSON objects).

I am now using Spark 1.2 from the Cloudera snapshot and I noticed that
there is a new method getAs. I was able to use it to read for example an
array of strings like so:
t.getAs[Buffer[CharSequence]](12)

However, if I try to read a column with a nested JSON object like this:
t.getAs[Map[String, Any]](11)

I get the following error:
java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to
scala.collection.immutable.Map

How can I read such a field? Am I just missing something small or should I
be looking for a completely different alternative to reading JSON?

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Re: Reading nested JSON data with Spark SQL

2014-11-19 Thread Simone Franzini
This works great, thank you!

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Wed, Nov 19, 2014 at 3:40 PM, Michael Armbrust 
wrote:

> You can extract the nested fields in sql: SELECT field.nestedField ...
>
> If you don't do that then nested fields are represented as rows within
> rows and can be retrieved as follows:
>
> t.getAs[Row](0).getInt(0)
>
> Also, I would write t.getAs[Buffer[CharSequence]](12) as
> t.getAs[Seq[String]](12) since we don't guarantee the return type will be
> a buffer.
>
>
> On Wed, Nov 19, 2014 at 1:33 PM, Simone Franzini 
> wrote:
>
>> I have been using Spark SQL to read in JSON data, like so:
>> val myJsonFile = sqc.jsonFile(args("myLocation"))
>> myJsonFile.registerTempTable("myTable")
>> sqc.sql("mySQLQuery").map { row =>
>> myFunction(row)
>> }
>>
>> And then in myFunction(row) I can read the various columns with the
>> Row.getX methods. However, this methods only work for basic types (string,
>> int, ...).
>> I was having some trouble reading columns that are arrays or maps (i.e.
>> other JSON objects).
>>
>> I am now using Spark 1.2 from the Cloudera snapshot and I noticed that
>> there is a new method getAs. I was able to use it to read for example an
>> array of strings like so:
>> t.getAs[Buffer[CharSequence]](12)
>>
>> However, if I try to read a column with a nested JSON object like this:
>> t.getAs[Map[String, Any]](11)
>>
>> I get the following error:
>> java.lang.ClassCastException:
>> org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to
>> scala.collection.immutable.Map
>>
>> How can I read such a field? Am I just missing something small or should
>> I be looking for a completely different alternative to reading JSON?
>>
>> Simone Franzini, PhD
>>
>> http://www.linkedin.com/in/simonefranzini
>>
>
>


Re: How can I read this avro file using spark & scala?

2014-11-21 Thread Simone Franzini
I have also been struggling with reading avro. Very glad to hear that there
is a new avro library coming in Spark 1.2 (which by the way, seems to have
a lot of other very useful improvements).

In the meanwhile, I have been able to piece together several snippets/tips
that I found from various sources and I am now able to read/write avro
correctly. From my understanding, you basically need 3 pieces:
1. Use the kryo serializer.
2. Register your avro classes. I have done this using twitter chill 0.4.0.
3. Read/write avro with a snippet of code like the one you posted.

Here is relevant code (hopefully all of it).

// All of the following are needed in order to read/write AVRO files
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.{ FileSystem, Path }
// Uncomment the following line if you want to use generic AVRO, I am using
specific
//import org.apache.avro.generic.GenericData
import org.apache.avro.Schema
import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat,
AvroKeyOutputFormat }
import org.apache.avro.mapred.AvroKey
// Kryo/avro serialization stuff
import com.esotericsoftware.kryo.Kryo
import com.twitter.chill.avro.AvroSerializer
import org.apache.spark.serializer.{ KryoSerializer, KryoRegistrator }

object MyApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "MyRegistrator")

}

// Read
val readJob = new Job()
AvroJob.setInputKeySchema(readJob, schema)
sc.newAPIHadoopFile(inputPath,
classOf[AvroKeyInputFormat[MyAvroClass]],
classOf[AvroKey[MyAvroClass]],
classOf[NullWritable],
readJob.getConfiguration)
.map { t => t._1.datum }

// Write
val rddAvroWritable = rdd.map { s => (new AvroKey(s), NullWritable.get) }
val writeJob = new Job()
AvroJob.setOutputKeySchema(writeJob, schema)

writeJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[MyAvroClass]])
rddAvroWritable.saveAsNewAPIHadoopFile(outputPath,
classOf[AvroKey[MyAvroClass]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[MyAvroClass]],
writeJob.getConfiguration)

}
}


class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
// Put a line like the following for each of your Avro classes if you use
specific Avro
// If you use generic Avro, chill also has a function for that:
GenericRecordSerializer
kryo.register(classOf[MyAvroClass],
AvroSerializer.SpecificRecordBinarySerializer[MyAvroClass])
}
}

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Fri, Nov 21, 2014 at 7:04 AM, thomas j  wrote:

> I've been able to load a different avro file based on GenericRecord with:
>
> val person = sqlContext.avroFile("/tmp/person.avro")
>
> When I try to call `first()` on it, I get "NotSerializableException"
> exceptions again:
>
> person.first()
>
> ...
> 14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
> 20)
> java.io.NotSerializableException:
> org.apache.avro.generic.GenericData$Record
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ...
>
> Apart from this I want to transform the records into pairs of (user_id,
> record). I can do this by specifying the offset of the user_id column with
> something like this:
>
> person.map(r => (r.getInt(2), r)).take(4).collect()
>
> Is there any way to be able to specify the column name ("user_id") instead
> of needing to know/calculate the offset somehow?
>
> Thanks again
>
>
> On Fri, Nov 21, 2014 at 11:48 AM, thomas j 
> wrote:
>
>> Thanks for the pointer Michael.
>>
>> I've downloaded spark 1.2.0 from
>> https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and
>> built the spark-avro repo you linked to.
>>
>> When I run it against the example avro file linked to in the
>> documentation it works. However, when I try to load my avro file (linked to
>> in my original question) I receive the following error:
>>
>> java.lang.RuntimeException: Unsupported type LONG
>> at scala.sys.package$.error(package.scala:27)
>> at com.datab

Kryo NPE with Array

2014-11-25 Thread Simone Franzini
I am running into the following NullPointerException:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
underlying (scala.collection.convert.Wrappers$JListWrapper)
myArrayField (MyCaseClass)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)

I have been running into similar issues when using avro classes, that I was
able to resolve by registering them with a Kryo serializer that uses
chill-avro. However, in this case the field is in a case class and it seems
that registering the class does not help.

I found this stack overflow that seems to be relevant:
http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
I have this line of code translated to Scala, that supposedly solves the
issue:

val kryo = new Kryo()
kryo.getInstantiatorStrategy().asInstanceOf[Kryo.DefaultInstantiatorStrategy].setFallbackInstantiatorStrategy(new
StdInstantiatorStrategy())

However, I am not sure where this line should be placed to take effect.

I already have the following, should it go somewhere in here?
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(...)
}
}


Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Re: Kryo NPE with Array

2014-11-26 Thread Simone Franzini
I guess I already have the answer of what I have to do here, which is to
configure the kryo object with the strategy as above.
Now the question becomes: how can I pass this custom kryo configuration to
the spark kryo serializer / kryo registrator?
I've had a look at the code but I am still fairly new to Scala and I can't
see how I would do this. In the worst case, could I override the newKryo
method and put my configuration there? It appears to me that method is the
one where the kryo instance is created.

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Tue, Nov 25, 2014 at 2:38 PM, Simone Franzini 
wrote:

> I am running into the following NullPointerException:
>
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> underlying (scala.collection.convert.Wrappers$JListWrapper)
> myArrayField (MyCaseClass)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>
> I have been running into similar issues when using avro classes, that I
> was able to resolve by registering them with a Kryo serializer that uses
> chill-avro. However, in this case the field is in a case class and it seems
> that registering the class does not help.
>
> I found this stack overflow that seems to be relevant:
>
> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
> I have this line of code translated to Scala, that supposedly solves the
> issue:
>
> val kryo = new Kryo()
> kryo.getInstantiatorStrategy().asInstanceOf[Kryo.DefaultInstantiatorStrategy].setFallbackInstantiatorStrategy(new
> StdInstantiatorStrategy())
>
> However, I am not sure where this line should be placed to take effect.
>
> I already have the following, should it go somewhere in here?
> class MyRegistrator extends KryoRegistrator {
> override def registerClasses(kryo: Kryo) {
> kryo.register(...)
> }
> }
>
>
> Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>


Re: Spark SQL 1.0.0 - RDD from snappy compress avro file

2014-11-29 Thread Simone Franzini
Did you have a look at my reply in this thread?

http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-read-this-avro-file-using-spark-amp-scala-td19400.html

I am using 1.1.0 though, so not sure if that code would work entirely with
1.0.0, but you can try.


Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Sat, Nov 29, 2014 at 5:43 AM, Vikas Agarwal 
wrote:

> Just in case it helps: https://github.com/databricks/spark-avro
>
> On Fri, Nov 28, 2014 at 8:48 PM, cjdc  wrote:
>
>> To make it simpler, for now forget the snappy compression. Just assume
>> they
>> are binary Avro files...
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-0-RDD-from-snappy-compress-avro-file-tp19998p20008.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Regards,
> Vikas Agarwal
> 91 – 9928301411
>
> InfoObjects, Inc.
> Execution Matters
> http://www.infoobjects.com
> 2041 Mission College Boulevard, #280
> Santa Clara, CA 95054
> +1 (408) 988-2000 Work
> +1 (408) 716-2726 Fax
>
>


Re: Kryo NPE with Array

2014-12-02 Thread Simone Franzini
I finally solved this issue. The problem was that:
1. I defined a case class with a Buffer[MyType] field.
2. I instantiated the class with the field set to the value given by an
implicit conversion from a Java list, which is supposedly a Buffer.
3. However, the underlying type of that field was instead
scala.collection.convert.Wrappers.JListWrapper, as noted in the exception
above. This type was not registered with Kryo and so that's why I got the
exception.

Registering the type did not solve the problem. However, an additional call
to .toBuffer did solve the problem, since the Buffer class is registered
through the Chill AllScalaRegistrar which is called by the Spark Kryo
serializer.

I thought I'd document this in case somebody else is running into a similar
issue.

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Wed, Nov 26, 2014 at 7:40 PM, Simone Franzini 
wrote:

> I guess I already have the answer of what I have to do here, which is to
> configure the kryo object with the strategy as above.
> Now the question becomes: how can I pass this custom kryo configuration to
> the spark kryo serializer / kryo registrator?
> I've had a look at the code but I am still fairly new to Scala and I can't
> see how I would do this. In the worst case, could I override the newKryo
> method and put my configuration there? It appears to me that method is the
> one where the kryo instance is created.
>
> Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>
> On Tue, Nov 25, 2014 at 2:38 PM, Simone Franzini 
> wrote:
>
>> I am running into the following NullPointerException:
>>
>> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
>> Serialization trace:
>> underlying (scala.collection.convert.Wrappers$JListWrapper)
>> myArrayField (MyCaseClass)
>> at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>
>> I have been running into similar issues when using avro classes, that I
>> was able to resolve by registering them with a Kryo serializer that uses
>> chill-avro. However, in this case the field is in a case class and it seems
>> that registering the class does not help.
>>
>> I found this stack overflow that seems to be relevant:
>>
>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
>> I have this line of code translated to Scala, that supposedly solves the
>> issue:
>>
>> val kryo = new Kryo()
>> kryo.getInstantiatorStrategy().asInstanceOf[Kryo.DefaultInstantiatorStrategy].setFallbackInstantiatorStrategy(new
>> StdInstantiatorStrategy())
>>
>> However, I am not sure where this line should be placed to take effect.
>>
>> I already have the following, should it go somewhere in here?
>> class MyRegistrator extends KryoRegistrator {
>> override def registerClasses(kryo: Kryo) {
>> kryo.register(...)
>> }
>> }
>>
>>
>> Simone Franzini, PhD
>>
>> http://www.linkedin.com/in/simonefranzini
>>
>
>


Where can you get nightly builds?

2014-12-06 Thread Simone Franzini
I recently read in the mailing list that there are now nightly builds
available. However, I can't find them anywhere. Is this really done? If so,
where can I get them?

Thanks,
Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Re: NullPointerException When Reading Avro Sequence Files

2014-12-09 Thread Simone Franzini
Hi Cristovao,

I have seen a very similar issue that I have posted about in this thread:
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html
I think your main issue here is somewhat similar, in that the MapWrapper
Scala class is not registered. This gets registered by the Twitter
chill-scala AllScalaRegistrar class that you are currently not using.

As far as I understand, in order to use Avro with Spark, you also have to
use Kryo. This means you have to use the Spark KryoSerializer. This in turn
uses Twitter chill. I posted the basic code that I am using here:

http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-read-this-avro-file-using-spark-amp-scala-td19400.html#a19491

Maybe there is a simpler solution to your problem but I am not that much of
an expert yet. I hope this helps.

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Tue, Dec 9, 2014 at 8:50 AM, Cristovao Jose Domingues Cordeiro <
cristovao.corde...@cern.ch> wrote:

>  Hi Simone,
>
> thanks but I don't think that's it.
> I've tried several libraries within the --jar argument. Some do give what
> you said. But other times (when I put the right version I guess) I get the
> following:
> 14/12/09 15:45:54 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
> java.io.NotSerializableException:
> scala.collection.convert.Wrappers$MapWrapper
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>
>
> Which is odd since I am reading a Avro I wrote...with the same piece of
> code:
> https://gist.github.com/MLnick/5864741781b9340cb211
>
>  Cumprimentos / Best regards,
> Cristóvão José Domingues Cordeiro
> IT Department - 28/R-018
> CERN
>--
> *From:* Simone Franzini [captainfr...@gmail.com]
> *Sent:* 06 December 2014 15:48
> *To:* Cristovao Jose Domingues Cordeiro
> *Subject:* Re: NullPointerException When Reading Avro Sequence Files
>
>   java.lang.IncompatibleClassChangeError: Found interface
> org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
>
>  That is a sign that you are mixing up versions of Hadoop. This is
> particularly an issue when dealing with AVRO. If you are using Hadoop 2,
> you will need to get the hadoop 2 version of avro-mapred. In Maven you
> would do this with the  hadoop2  tag.
>
>  Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>
> On Fri, Dec 5, 2014 at 3:52 AM, cjdc  wrote:
>
>> Hi all,
>>
>> I've tried the above example on Gist, but it doesn't work (at least for
>> me).
>> Did anyone get this:
>> 14/12/05 10:44:40 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
>> 0)
>> java.lang.IncompatibleClassChangeError: Found interface
>> org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
>> at
>>
>> org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
>> at
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:115)
>> at
>> org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:103)
>> at
>> org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>> at org.apache.spark.scheduler.Task.run(Task.scala:54)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 14/12/05 10:44:40 ERROR ExecutorUncaughtExceptionHandler: Uncaught
>> exception
>> in thread Thread[Executor task launch worker-0,5,main]
>> java.lang.IncompatibleClassChangeError: Found interface
>> org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
>> at
>>
>> org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
>> at
>> org.apache.spark.rdd.NewHa

Re: NullPointerException When Reading Avro Sequence Files

2014-12-09 Thread Simone Franzini
You can use this Maven dependency:


com.twitter
chill-avro
0.4.0


Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Tue, Dec 9, 2014 at 9:53 AM, Cristovao Jose Domingues Cordeiro <
cristovao.corde...@cern.ch> wrote:

>  Thanks for the reply!
>
> I've tried in fact your code. But I lack the twiter chill package and I
> can not find it online. So I am now trying this
> http://spark.apache.org/docs/latest/tuning.html#data-serialization . But
> in case I can't do it, could you tell me where to get that Twiter package
> you used?
>
> Thanks
>
>  Cumprimentos / Best regards,
> Cristóvão José Domingues Cordeiro
> IT Department - 28/R-018
> CERN
>--
> *From:* Simone Franzini [captainfr...@gmail.com]
> *Sent:* 09 December 2014 16:42
> *To:* Cristovao Jose Domingues Cordeiro; user
>
> *Subject:* Re: NullPointerException When Reading Avro Sequence Files
>
>   Hi Cristovao,
>
> I have seen a very similar issue that I have posted about in this thread:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html
>  I think your main issue here is somewhat similar, in that the MapWrapper
> Scala class is not registered. This gets registered by the Twitter
> chill-scala AllScalaRegistrar class that you are currently not using.
>
>  As far as I understand, in order to use Avro with Spark, you also have
> to use Kryo. This means you have to use the Spark KryoSerializer. This in
> turn uses Twitter chill. I posted the basic code that I am using here:
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-read-this-avro-file-using-spark-amp-scala-td19400.html#a19491
>
>  Maybe there is a simpler solution to your problem but I am not that much
> of an expert yet. I hope this helps.
>
>  Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>
> On Tue, Dec 9, 2014 at 8:50 AM, Cristovao Jose Domingues Cordeiro <
> cristovao.corde...@cern.ch> wrote:
>
>>  Hi Simone,
>>
>> thanks but I don't think that's it.
>> I've tried several libraries within the --jar argument. Some do give what
>> you said. But other times (when I put the right version I guess) I get the
>> following:
>> 14/12/09 15:45:54 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
>> 0)
>> java.io.NotSerializableException:
>> scala.collection.convert.Wrappers$MapWrapper
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>> at
>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>>
>>
>> Which is odd since I am reading a Avro I wrote...with the same piece of
>> code:
>> https://gist.github.com/MLnick/5864741781b9340cb211
>>
>>  Cumprimentos / Best regards,
>> Cristóvão José Domingues Cordeiro
>> IT Department - 28/R-018
>> CERN
>>--
>> *From:* Simone Franzini [captainfr...@gmail.com]
>> *Sent:* 06 December 2014 15:48
>> *To:* Cristovao Jose Domingues Cordeiro
>> *Subject:* Re: NullPointerException When Reading Avro Sequence Files
>>
>>java.lang.IncompatibleClassChangeError: Found interface
>> org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
>>
>>  That is a sign that you are mixing up versions of Hadoop. This is
>> particularly an issue when dealing with AVRO. If you are using Hadoop 2,
>> you will need to get the hadoop 2 version of avro-mapred. In Maven you
>> would do this with the  hadoop2  tag.
>>
>>  Simone Franzini, PhD
>>
>> http://www.linkedin.com/in/simonefranzini
>>
>> On Fri, Dec 5, 2014 at 3:52 AM, cjdc  wrote:
>>
>>> Hi all,
>>>
>>> I've tried the above example on Gist, but it doesn't work (at least for
>>> me).
>>> Did anyone get this:
>>> 14/12/05 10:44:40 ERROR Executor: Exception in task 0.0 in stage 0.0
>>> (TID 0)
>>> java.lang.IncompatibleClassChangeError: Found interface
>>> org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
>>> at
>>>
>>> org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:115)
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:103)
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:

Re: NullPointerException When Reading Avro Sequence Files

2014-12-15 Thread Simone Franzini
To me this looks like an internal error to the REPL. I am not sure what is
causing that.
Personally I never use the REPL, can you try typing up your program and
running it from an IDE or spark-submit and see if you still get the same
error?

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Mon, Dec 15, 2014 at 4:54 PM, Cristovao Jose Domingues Cordeiro <
cristovao.corde...@cern.ch> wrote:
>
>  Sure, thanks:
> warning: there were 1 deprecation warning(s); re-run with -deprecation for
> details
> java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
> at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283)
> at org.apache.hadoop.mapreduce.Job.toString(Job.java:462)
> at
> scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
> at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
> at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
> at .(:10)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:846)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1119)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:672)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:703)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:667)
> at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:819)
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:864)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:776)
> at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:619)
> at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:627)
> at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:632)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:959)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:907)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:907)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:907)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1002)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
>
> Could something you omitted in your snippet be chaining this exception?
>
>  Cumprimentos / Best regards,
> Cristóvão José Domingues Cordeiro
> IT Department - 28/R-018
> CERN
>--
> *From:* Simone Franzini [captainfr...@gmail.com]
> *Sent:* 15 December 2014 16:52
>
> *To:* Cristovao Jose Domingues Cordeiro
> *Subject:* Re: NullPointerException When Reading Avro Sequence Files
>
>   Ok, I have no idea what that is. That appears to be an internal Spark
> exception. Maybe if you can post the entire stack trace it would give some
> more details to understand what is going on.
>
>  Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>
> On Mon, Dec 15, 2014 at 4:50 PM, Cristovao Jose Domingues Cordeiro <
> cristovao.corde...@cern.ch> wrote:
>>
>>  Hi,
>>
>> thanks for that.
>> But yeah the 2nd line is an exception. jobread is not created.
>>
>>  Cumprimentos / Best regards,
>> Cristóvão José Domingues Cordeiro
>> IT Department - 28/R-018
>> CERN
>>--
>> *From:* Simone Franzini [captainfr...@gmail.com

updateStateByKey: cleaning up state for keys not in current window

2015-01-09 Thread Simone Franzini
I know that in order to clean up the state for a key I have to return None
when I call updateStateByKey. However, as far as I understand,
updateStateByKey only gets called for new keys (i.e. keys in current
batch), not for all keys in the DStream.
So, how can I clear the state for those keys in this case? Or, in other
words, how can I clear the state for a key when Seq[V] is empty?


Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini