Re: "Sharing" dataframes...

2017-06-21 Thread Rick Moritz
Keeping it inside the same program/SparkContext is the most performant
solution, since you can avoid serialization and deserialization.
In-Memory-Persistance between jobs involves a memcopy, uses a lot of RAM
and invokes serialization and deserialization. Technologies that can help
you do that easily are Ignite (as mentioned) but also Alluxio, Cassandra
with in-memory tables and a memory-backed HDFS-directory (see tiered
storage).
Although livy and job-server provide the functionality of providing a
single SparkContext to mutliple programs, I would recommend you build your
own framework for integrating different jobs, since many features you may
need aren't present yet, while others may cause issues due to lack of
maturity. Artificially splitting jobs is in general a bad idea, since it
breaks the DAG and thus prevents some potential push-down optimizations.

On Tue, Jun 20, 2017 at 10:17 PM, Jean Georges Perrin  wrote:

> Thanks Vadim & Jörn... I will look into those.
>
> jg
>
> On Jun 20, 2017, at 2:12 PM, Vadim Semenov 
> wrote:
>
> You can launch one permanent spark context and then execute your jobs
> within the context. And since they'll be running in the same context, they
> can share data easily.
>
> These two projects provide the functionality that you need:
> https://github.com/spark-jobserver/spark-jobserver#
> persistent-context-mode---faster--required-for-related-jobs
> https://github.com/cloudera/livy#post-sessions
>
> On Tue, Jun 20, 2017 at 1:46 PM, Jean Georges Perrin  wrote:
>
>> Hey,
>>
>> Here is my need: program A does something on a set of data and produces
>> results, program B does that on another set, and finally, program C
>> combines the data of A and B. Of course, the easy way is to dump all on
>> disk after A and B are done, but I wanted to avoid this.
>>
>> I was thinking of creating a temp view, but I do not really like the temp
>> aspect of it ;). Any idea (they are all worth sharing)
>>
>> jg
>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>


VS: Using Spark as a simulator

2017-06-21 Thread Esa Heikkinen

Hi


Thanks for the answer.


I think my simulator includes a lot of parallel state machines and each of them 
generates log file (with timestamps). Finally all events (rows) of all log 
files should combine as time order to (one) very huge log file. Practically the 
combined huge log file can also be split into smaller ones.


What transformation or action functions can i use in Spark for that purpose ?

Or are there exist some code sample (Python or Scala) about that ?

Regards

Esa Heikkinen


Lähettäjä: Jörn Franke 
Lähetetty: 20. kesäkuuta 2017 17:12
Vastaanottaja: Esa Heikkinen
Kopio: user@spark.apache.org
Aihe: Re: Using Spark as a simulator

It is fine, but you have to design it that generated rows are written in large 
blocks for optimal performance.
The most tricky part with data generation is the conceptual part, such as 
probabilistic distribution etc
You have to check as well that you use a good random generator, for some cases 
the Java internal might be not that well.

On 20. Jun 2017, at 16:04, Esa Heikkinen 
mailto:esa.heikki...@student.tut.fi>> wrote:


Hi


Spark is a data analyzer, but would it be possible to use Spark as a data 
generator or simulator ?


My simulation can be very huge and i think a parallelized simulation using by 
Spark (cloud) could work.

Is that good or bad idea ?


Regards

Esa Heikkinen



Saving RDD as Kryo (broken in 2.1)

2017-06-21 Thread Alexander Krasheninnikov
Hi, all!
I have a code, serializing RDD as Kryo, and saving it as sequence file. It
works fine in 1.5.1, but, while switching to 2.1.1 it does not work.

I am trying to serialize RDD of Tuple2<> (got from PairRDD).

   1. RDD consists of different heterogeneous objects (aggregates, like
   HLL, QTree, Min, Max, etc.)
   2. Save is performed within streaming
   3. Read is performed out of streaming (another app)
   4. Supposed, that such error can be due to custom serializers - turned
   them off, but errors still exists
   5. Tried disabling references in Kryo (since I saw an error while
   resolving references) - got StackOverflow, and significant performance
   degradation
   6. Implementing Serializable/Externalizable is not a solution,
   unfortunately.

Expected behavior:

saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load
previously saved RDD.

Code of save/load:

object KryoFile {

  val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo]

  /*
   * Used to write as Object file using kryo serialization
   */
  def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)

rdd.context.setJobDescription("Saving to path " + path)
rdd.mapPartitions(iter => iter.grouped(10)
  .map(_.toArray))
  .map(splitArray => {
//initializes kyro and calls your registrator class
var kryo = THREAD_LOCAL_CACHE.get()
if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
kryo.reset()

// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
  }).saveAsSequenceFile(path)
  }

  /*
   * Method to read from object file which is saved kryo format.
   */
  def loadObjectFile[T](sc: SparkContext, path: String, minPartitions:
Int = 1)(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc.getConf)

sc.sequenceFile(path, classOf[NullWritable],
classOf[BytesWritable], minPartitions)
  .flatMap(x => {

var kryo = THREAD_LOCAL_CACHE.get()
if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
kryo.reset()
val dataObject = data.asInstanceOf[Array[T]]
dataObject
  })

  }
}


When trying to deserialize, I got such errors:
17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID
14)
java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:75)
at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID
12)
java.lang.ArrayStoreException: java.util.Collections$EmptyMap
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(Def

JDBC RDD Timestamp Parsing Issue

2017-06-21 Thread Aviral Agarwal
Hi,

I am using JDBC RDD to read from a MySQL RDBMS.
My spark job fails with the below error :

java.sql.SQLException: Value '-00-00 00:00:00.000' can not be
represented as java.sql.Timestamp


Now instead of the whole job failing I want to skip this record and
continue processing the rest.
Any leads on how that can be done ?


Thanks and Regards,
Aviral Agarwal


gfortran runtime library for Spark

2017-06-21 Thread Saroj C
Dear All,
 Can you please let me know, if gfortran runtime library is still required 
for Spark 2.1, for better performance. Note, I am using Java APIs for 
Spark ?

Thanks & Regards
Saroj 
=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




RE: Using Spark as a simulator

2017-06-21 Thread Mahesh Sawaiker
Spark can help you to create one large file if needed, but hdfs itself will 
provide abstraction over such things, so it's a trivial problem if anything.
If you have spark installed, then you can use spark-shell to try a few samples, 
and build from there.If you can collect all the files in a folder then spark 
can read all files from there. The programming guide below has enough 
information to get started.

https://spark.apache.org/docs/latest/programming-guide.html
All of Spark's file-based input methods, including textFile, support running on 
directories, compressed files, and wildcards as well. For example, you can use 
textFile("/my/directory"), textFile("/my/directory/*.txt"), and 
textFile("/my/directory/*.gz").

After reading the file you can map it using map function, which will split the 
individual line and possibly create a scala object. This way you will get a RDD 
of scala objects, which you can then process functional/set operators.

You would want to read about PairRDDs.

From: Esa Heikkinen [mailto:esa.heikki...@student.tut.fi]
Sent: Wednesday, June 21, 2017 1:12 PM
To: Jörn Franke
Cc: user@spark.apache.org
Subject: VS: Using Spark as a simulator




Hi



Thanks for the answer.


I think my simulator includes a lot of parallel state machines and each of them 
generates log file (with timestamps). Finally all events (rows) of all log 
files should combine as time order to (one) very huge log file. Practically the 
combined huge log file can also be split into smaller ones.


What transformation or action functions can i use in Spark for that purpose ?

Or are there exist some code sample (Python or Scala) about that ?

Regards

Esa Heikkinen


Lähettäjä: Jörn Franke mailto:jornfra...@gmail.com>>
Lähetetty: 20. kesäkuuta 2017 17:12
Vastaanottaja: Esa Heikkinen
Kopio: user@spark.apache.org
Aihe: Re: Using Spark as a simulator

It is fine, but you have to design it that generated rows are written in large 
blocks for optimal performance.
The most tricky part with data generation is the conceptual part, such as 
probabilistic distribution etc
You have to check as well that you use a good random generator, for some cases 
the Java internal might be not that well.

On 20. Jun 2017, at 16:04, Esa Heikkinen 
mailto:esa.heikki...@student.tut.fi>> wrote:

Hi



Spark is a data analyzer, but would it be possible to use Spark as a data 
generator or simulator ?



My simulation can be very huge and i think a parallelized simulation using by 
Spark (cloud) could work.

Is that good or bad idea ?



Regards

Esa Heikkinen


DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.


RE: JDBC RDD Timestamp Parsing Issue

2017-06-21 Thread Mahesh Sawaiker
This has to do with how you are creating the timestamp object from the 
resultset ( I guess).
If you can provide more code it will help, but you could surround the parsing 
code with a try catch and then just ignore the exception.

From: Aviral Agarwal [mailto:aviral12...@gmail.com]
Sent: Wednesday, June 21, 2017 2:37 PM
To: user@spark.apache.org
Subject: JDBC RDD Timestamp Parsing Issue

Hi,

I am using JDBC RDD to read from a MySQL RDBMS.
My spark job fails with the below error :


java.sql.SQLException: Value '-00-00 00:00:00.000' can not be represented 
as java.sql.Timestamp


Now instead of the whole job failing I want to skip this record and continue 
processing the rest.
Any leads on how that can be done ?


Thanks and Regards,
Aviral Agarwal
DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.


RE: JDBC RDD Timestamp Parsing Issue

2017-06-21 Thread Aviral Agarwal
The exception is happening in JDBC RDD code where getNext() is called to
get the next row.
I do not have access to the result set. I am operating on a DataFrame.

Thanks and Regards,
Aviral Agarwal

On Jun 21, 2017 17:19, "Mahesh Sawaiker" 
wrote:

> This has to do with how you are creating the timestamp object from the
> resultset ( I guess).
>
> If you can provide more code it will help, but you could surround the
> parsing code with a try catch and then just ignore the exception.
>
>
>
> *From:* Aviral Agarwal [mailto:aviral12...@gmail.com]
> *Sent:* Wednesday, June 21, 2017 2:37 PM
> *To:* user@spark.apache.org
> *Subject:* JDBC RDD Timestamp Parsing Issue
>
>
>
> Hi,
>
>
>
> I am using JDBC RDD to read from a MySQL RDBMS.
>
> My spark job fails with the below error :
>
>
>
> java.sql.SQLException: Value '-00-00 00:00:00.000' can not be represented 
> as java.sql.Timestamp
>
>
>
> Now instead of the whole job failing I want to skip this record and
> continue processing the rest.
> Any leads on how that can be done ?
>
>
> Thanks and Regards,
> Aviral Agarwal
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>


Re: JDBC RDD Timestamp Parsing Issue

2017-06-21 Thread Eduardo Mello
You can add "?zeroDateTimeBehavior=convertToNull" to the connection string.

On Wed, Jun 21, 2017 at 9:04 AM, Aviral Agarwal 
wrote:

> The exception is happening in JDBC RDD code where getNext() is called to
> get the next row.
> I do not have access to the result set. I am operating on a DataFrame.
>
> Thanks and Regards,
> Aviral Agarwal
>
> On Jun 21, 2017 17:19, "Mahesh Sawaiker" 
> wrote:
>
>> This has to do with how you are creating the timestamp object from the
>> resultset ( I guess).
>>
>> If you can provide more code it will help, but you could surround the
>> parsing code with a try catch and then just ignore the exception.
>>
>>
>>
>> *From:* Aviral Agarwal [mailto:aviral12...@gmail.com]
>> *Sent:* Wednesday, June 21, 2017 2:37 PM
>> *To:* user@spark.apache.org
>> *Subject:* JDBC RDD Timestamp Parsing Issue
>>
>>
>>
>> Hi,
>>
>>
>>
>> I am using JDBC RDD to read from a MySQL RDBMS.
>>
>> My spark job fails with the below error :
>>
>>
>>
>> java.sql.SQLException: Value '-00-00 00:00:00.000' can not be 
>> represented as java.sql.Timestamp
>>
>>
>>
>> Now instead of the whole job failing I want to skip this record and
>> continue processing the rest.
>> Any leads on how that can be done ?
>>
>>
>> Thanks and Regards,
>> Aviral Agarwal
>> DISCLAIMER
>> ==
>> This e-mail may contain privileged and confidential information which is
>> the property of Persistent Systems Ltd. It is intended only for the use of
>> the individual or entity to which it is addressed. If you are not the
>> intended recipient, you are not authorized to read, retain, copy, print,
>> distribute or use this message. If you have received this communication in
>> error, please notify the sender and delete all copies of this message.
>> Persistent Systems Ltd. does not accept any liability for virus infected
>> mails.
>>
>


Re: JDBC RDD Timestamp Parsing Issue

2017-06-21 Thread Aviral Agarwal
This woks. Thanks !

- Aviral Agarwal

On Wed, Jun 21, 2017 at 6:07 PM, Eduardo Mello  wrote:

> You can add "?zeroDateTimeBehavior=convertToNull" to the connection
> string.
>
> On Wed, Jun 21, 2017 at 9:04 AM, Aviral Agarwal 
> wrote:
>
>> The exception is happening in JDBC RDD code where getNext() is called to
>> get the next row.
>> I do not have access to the result set. I am operating on a DataFrame.
>>
>> Thanks and Regards,
>> Aviral Agarwal
>>
>> On Jun 21, 2017 17:19, "Mahesh Sawaiker" 
>> wrote:
>>
>>> This has to do with how you are creating the timestamp object from the
>>> resultset ( I guess).
>>>
>>> If you can provide more code it will help, but you could surround the
>>> parsing code with a try catch and then just ignore the exception.
>>>
>>>
>>>
>>> *From:* Aviral Agarwal [mailto:aviral12...@gmail.com]
>>> *Sent:* Wednesday, June 21, 2017 2:37 PM
>>> *To:* user@spark.apache.org
>>> *Subject:* JDBC RDD Timestamp Parsing Issue
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I am using JDBC RDD to read from a MySQL RDBMS.
>>>
>>> My spark job fails with the below error :
>>>
>>>
>>>
>>> java.sql.SQLException: Value '-00-00 00:00:00.000' can not be 
>>> represented as java.sql.Timestamp
>>>
>>>
>>>
>>> Now instead of the whole job failing I want to skip this record and
>>> continue processing the rest.
>>> Any leads on how that can be done ?
>>>
>>>
>>> Thanks and Regards,
>>> Aviral Agarwal
>>> DISCLAIMER
>>> ==
>>> This e-mail may contain privileged and confidential information which is
>>> the property of Persistent Systems Ltd. It is intended only for the use of
>>> the individual or entity to which it is addressed. If you are not the
>>> intended recipient, you are not authorized to read, retain, copy, print,
>>> distribute or use this message. If you have received this communication in
>>> error, please notify the sender and delete all copies of this message.
>>> Persistent Systems Ltd. does not accept any liability for virus infected
>>> mails.
>>>
>>
>


Re: "Sharing" dataframes...

2017-06-21 Thread Michael Mior
This is a puzzling suggestion to me. It's unclear what features the OP
needs, so it's really hard to say whether Livy or job-server aren't
sufficient. It's true that neither are particularly mature, but they're
much more mature than a homemade project which hasn't started yet.

That said, I'm not very familiar with either project, so perhaps there are
some big concerns I'm not aware of.

--
Michael Mior
mm...@apache.org

2017-06-21 3:19 GMT-04:00 Rick Moritz :

> Keeping it inside the same program/SparkContext is the most performant
> solution, since you can avoid serialization and deserialization.
> In-Memory-Persistance between jobs involves a memcopy, uses a lot of RAM
> and invokes serialization and deserialization. Technologies that can help
> you do that easily are Ignite (as mentioned) but also Alluxio, Cassandra
> with in-memory tables and a memory-backed HDFS-directory (see tiered
> storage).
> Although livy and job-server provide the functionality of providing a
> single SparkContext to mutliple programs, I would recommend you build your
> own framework for integrating different jobs, since many features you may
> need aren't present yet, while others may cause issues due to lack of
> maturity. Artificially splitting jobs is in general a bad idea, since it
> breaks the DAG and thus prevents some potential push-down optimizations.
>
> On Tue, Jun 20, 2017 at 10:17 PM, Jean Georges Perrin  wrote:
>
>> Thanks Vadim & Jörn... I will look into those.
>>
>> jg
>>
>> On Jun 20, 2017, at 2:12 PM, Vadim Semenov 
>> wrote:
>>
>> You can launch one permanent spark context and then execute your jobs
>> within the context. And since they'll be running in the same context, they
>> can share data easily.
>>
>> These two projects provide the functionality that you need:
>> https://github.com/spark-jobserver/spark-jobserver#persisten
>> t-context-mode---faster--required-for-related-jobs
>> https://github.com/cloudera/livy#post-sessions
>>
>> On Tue, Jun 20, 2017 at 1:46 PM, Jean Georges Perrin  wrote:
>>
>>> Hey,
>>>
>>> Here is my need: program A does something on a set of data and produces
>>> results, program B does that on another set, and finally, program C
>>> combines the data of A and B. Of course, the easy way is to dump all on
>>> disk after A and B are done, but I wanted to avoid this.
>>>
>>> I was thinking of creating a temp view, but I do not really like the
>>> temp aspect of it ;). Any idea (they are all worth sharing)
>>>
>>> jg
>>>
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>


Re: "Sharing" dataframes...

2017-06-21 Thread Jean Georges Perrin
I have looked at Livy in the (very recent past) past and it will not do the 
trick for me. It seems pretty greedy in terms of resources (or at least that 
was our experience). I will investigate how job-server could do the trick.

(on a side note I tried to find a paper on memory lifecycle within Spark but 
was not very successful, maybe someone has a link to spare.)

My need is to keep one/several dataframes in memory (well, within Spark) so 
it/they can be reused at a later time, without persisting it/them to disk 
(unless Spark wants to, of course).



> On Jun 21, 2017, at 10:47 AM, Michael Mior  wrote:
> 
> This is a puzzling suggestion to me. It's unclear what features the OP needs, 
> so it's really hard to say whether Livy or job-server aren't sufficient. It's 
> true that neither are particularly mature, but they're much more mature than 
> a homemade project which hasn't started yet.
> 
> That said, I'm not very familiar with either project, so perhaps there are 
> some big concerns I'm not aware of.
> 
> --
> Michael Mior
> mm...@apache.org 
> 
> 2017-06-21 3:19 GMT-04:00 Rick Moritz  >:
> Keeping it inside the same program/SparkContext is the most performant 
> solution, since you can avoid serialization and deserialization. 
> In-Memory-Persistance between jobs involves a memcopy, uses a lot of RAM and 
> invokes serialization and deserialization. Technologies that can help you do 
> that easily are Ignite (as mentioned) but also Alluxio, Cassandra with 
> in-memory tables and a memory-backed HDFS-directory (see tiered storage).
> Although livy and job-server provide the functionality of providing a single 
> SparkContext to mutliple programs, I would recommend you build your own 
> framework for integrating different jobs, since many features you may need 
> aren't present yet, while others may cause issues due to lack of maturity. 
> Artificially splitting jobs is in general a bad idea, since it breaks the DAG 
> and thus prevents some potential push-down optimizations.
> 
> On Tue, Jun 20, 2017 at 10:17 PM, Jean Georges Perrin  > wrote:
> Thanks Vadim & Jörn... I will look into those.
> 
> jg
> 
>> On Jun 20, 2017, at 2:12 PM, Vadim Semenov > > wrote:
>> 
>> You can launch one permanent spark context and then execute your jobs within 
>> the context. And since they'll be running in the same context, they can 
>> share data easily.
>> 
>> These two projects provide the functionality that you need:
>> https://github.com/spark-jobserver/spark-jobserver#persistent-context-mode---faster--required-for-related-jobs
>>  
>> 
>> https://github.com/cloudera/livy#post-sessions 
>> 
>> 
>> On Tue, Jun 20, 2017 at 1:46 PM, Jean Georges Perrin > > wrote:
>> Hey,
>> 
>> Here is my need: program A does something on a set of data and produces 
>> results, program B does that on another set, and finally, program C combines 
>> the data of A and B. Of course, the easy way is to dump all on disk after A 
>> and B are done, but I wanted to avoid this.
>> 
>> I was thinking of creating a temp view, but I do not really like the temp 
>> aspect of it ;). Any idea (they are all worth sharing)
>> 
>> jg
>> 
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 
>> 
>> 
> 
> 
> 



Re: "Sharing" dataframes...

2017-06-21 Thread Gene Pang
Hi Jean,

As others have mentioned, you can use Alluxio with Spark dataframes
 to
keep the data in memory, and for other jobs to read them from memory again.

Hope this helps,
Gene

On Wed, Jun 21, 2017 at 8:08 AM, Jean Georges Perrin  wrote:

> I have looked at Livy in the (very recent past) past and it will not do
> the trick for me. It seems pretty greedy in terms of resources (or at least
> that was our experience). I will investigate how job-server could do the
> trick.
>
> (on a side note I tried to find a paper on memory lifecycle within Spark
> but was not very successful, maybe someone has a link to spare.)
>
> My need is to keep one/several dataframes in memory (well, within Spark)
> so it/they can be reused at a later time, without persisting it/them to
> disk (unless Spark wants to, of course).
>
>
>
> On Jun 21, 2017, at 10:47 AM, Michael Mior  wrote:
>
> This is a puzzling suggestion to me. It's unclear what features the OP
> needs, so it's really hard to say whether Livy or job-server aren't
> sufficient. It's true that neither are particularly mature, but they're
> much more mature than a homemade project which hasn't started yet.
>
> That said, I'm not very familiar with either project, so perhaps there are
> some big concerns I'm not aware of.
>
> --
> Michael Mior
> mm...@apache.org
>
> 2017-06-21 3:19 GMT-04:00 Rick Moritz :
>
>> Keeping it inside the same program/SparkContext is the most performant
>> solution, since you can avoid serialization and deserialization.
>> In-Memory-Persistance between jobs involves a memcopy, uses a lot of RAM
>> and invokes serialization and deserialization. Technologies that can help
>> you do that easily are Ignite (as mentioned) but also Alluxio, Cassandra
>> with in-memory tables and a memory-backed HDFS-directory (see tiered
>> storage).
>> Although livy and job-server provide the functionality of providing a
>> single SparkContext to mutliple programs, I would recommend you build your
>> own framework for integrating different jobs, since many features you may
>> need aren't present yet, while others may cause issues due to lack of
>> maturity. Artificially splitting jobs is in general a bad idea, since it
>> breaks the DAG and thus prevents some potential push-down optimizations.
>>
>> On Tue, Jun 20, 2017 at 10:17 PM, Jean Georges Perrin 
>> wrote:
>>
>>> Thanks Vadim & Jörn... I will look into those.
>>>
>>> jg
>>>
>>> On Jun 20, 2017, at 2:12 PM, Vadim Semenov 
>>> wrote:
>>>
>>> You can launch one permanent spark context and then execute your jobs
>>> within the context. And since they'll be running in the same context, they
>>> can share data easily.
>>>
>>> These two projects provide the functionality that you need:
>>> https://github.com/spark-jobserver/spark-jobserver#persisten
>>> t-context-mode---faster--required-for-related-jobs
>>> https://github.com/cloudera/livy#post-sessions
>>>
>>> On Tue, Jun 20, 2017 at 1:46 PM, Jean Georges Perrin 
>>> wrote:
>>>
 Hey,

 Here is my need: program A does something on a set of data and produces
 results, program B does that on another set, and finally, program C
 combines the data of A and B. Of course, the easy way is to dump all on
 disk after A and B are done, but I wanted to avoid this.

 I was thinking of creating a temp view, but I do not really like the
 temp aspect of it ;). Any idea (they are all worth sharing)

 jg



 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>>
>>
>
>


Re: Do we anything for Deep Learning in Spark?

2017-06-21 Thread Suzen, Mehmet
There is a BigDL project:
https://github.com/intel-analytics/BigDL

On 20 June 2017 at 16:17, Jules Damji  wrote:
> And we will having a webinar on July 27 going into some more  details. Stay
> tuned.
>
> Cheers
> Jules
>
> Sent from my iPhone
> Pardon the dumb thumb typos :)
>
> On Jun 20, 2017, at 7:00 AM, Michael Mior  wrote:
>
> It's still in the early stages, but check out Deep Learning Pipelines from
> Databricks
>
> https://github.com/databricks/spark-deep-learning
>
> --
> Michael Mior
> mm...@apache.org
>
> 2017-06-20 0:36 GMT-04:00 Gaurav1809 :
>>
>> Hi All,
>>
>> Similar to how we have machine learning library called ML, do we have
>> anything for deep learning?
>> If yes, please share the details. If not then what should be the approach?
>>
>> Thanks and regards,
>> Gaurav Pandya
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-anything-for-Deep-Learning-in-Spark-tp28772.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: "Sharing" dataframes...

2017-06-21 Thread Pierce Lamb
Hi Jean,

Since many in this thread have mentioned datastores from what I would call
the "Spark datastore ecosystem" I thought I would link you to a
StackOverflow answer I posted awhile back that tried to capture the
majority of this ecosystem. Most would claim to allow you to do something
like you're describing in your original email once connected to Spark:

https://stackoverflow.com/questions/39650298/how-to-save-insert-each-dstream-into-a-permanent-table/39753976#39753976

Regarding Rick Mortiz's reply, SnappyData
, a member of this ecosystem,
avoids the latency intensive serialization steps he describes by
integrating the database and Spark such that they use the same JVM/block
manager (you can think of it as an in-memory SQL database replacing Spark's
native cache).

Hope this helps,

Pierce

On Wed, Jun 21, 2017 at 8:29 AM, Gene Pang  wrote:

> Hi Jean,
>
> As others have mentioned, you can use Alluxio with Spark dataframes
>  to
> keep the data in memory, and for other jobs to read them from memory again.
>
> Hope this helps,
> Gene
>
> On Wed, Jun 21, 2017 at 8:08 AM, Jean Georges Perrin  wrote:
>
>> I have looked at Livy in the (very recent past) past and it will not do
>> the trick for me. It seems pretty greedy in terms of resources (or at least
>> that was our experience). I will investigate how job-server could do the
>> trick.
>>
>> (on a side note I tried to find a paper on memory lifecycle within Spark
>> but was not very successful, maybe someone has a link to spare.)
>>
>> My need is to keep one/several dataframes in memory (well, within Spark)
>> so it/they can be reused at a later time, without persisting it/them to
>> disk (unless Spark wants to, of course).
>>
>>
>>
>> On Jun 21, 2017, at 10:47 AM, Michael Mior  wrote:
>>
>> This is a puzzling suggestion to me. It's unclear what features the OP
>> needs, so it's really hard to say whether Livy or job-server aren't
>> sufficient. It's true that neither are particularly mature, but they're
>> much more mature than a homemade project which hasn't started yet.
>>
>> That said, I'm not very familiar with either project, so perhaps there
>> are some big concerns I'm not aware of.
>>
>> --
>> Michael Mior
>> mm...@apache.org
>>
>> 2017-06-21 3:19 GMT-04:00 Rick Moritz :
>>
>>> Keeping it inside the same program/SparkContext is the most performant
>>> solution, since you can avoid serialization and deserialization.
>>> In-Memory-Persistance between jobs involves a memcopy, uses a lot of RAM
>>> and invokes serialization and deserialization. Technologies that can help
>>> you do that easily are Ignite (as mentioned) but also Alluxio, Cassandra
>>> with in-memory tables and a memory-backed HDFS-directory (see tiered
>>> storage).
>>> Although livy and job-server provide the functionality of providing a
>>> single SparkContext to mutliple programs, I would recommend you build your
>>> own framework for integrating different jobs, since many features you may
>>> need aren't present yet, while others may cause issues due to lack of
>>> maturity. Artificially splitting jobs is in general a bad idea, since it
>>> breaks the DAG and thus prevents some potential push-down optimizations.
>>>
>>> On Tue, Jun 20, 2017 at 10:17 PM, Jean Georges Perrin 
>>> wrote:
>>>
 Thanks Vadim & Jörn... I will look into those.

 jg

 On Jun 20, 2017, at 2:12 PM, Vadim Semenov 
 wrote:

 You can launch one permanent spark context and then execute your jobs
 within the context. And since they'll be running in the same context, they
 can share data easily.

 These two projects provide the functionality that you need:
 https://github.com/spark-jobserver/spark-jobserver#persisten
 t-context-mode---faster--required-for-related-jobs
 https://github.com/cloudera/livy#post-sessions

 On Tue, Jun 20, 2017 at 1:46 PM, Jean Georges Perrin 
 wrote:

> Hey,
>
> Here is my need: program A does something on a set of data and
> produces results, program B does that on another set, and finally, program
> C combines the data of A and B. Of course, the easy way is to dump all on
> disk after A and B are done, but I wanted to avoid this.
>
> I was thinking of creating a temp view, but I do not really like the
> temp aspect of it ;). Any idea (they are all worth sharing)
>
> jg
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


>>>
>>
>>
>


Unsubscribe

2017-06-21 Thread Tao Lu
Unsubscribe


Broadcasts & Storage Memory

2017-06-21 Thread Bryan Jeffrey
Hello.

Question: Do broadcast variables stored on executors count as part of
'storage memory' or other memory?

A little bit more detail:

I understand that we have two knobs to control memory allocation:
- spark.memory.fraction
- spark.memory.storageFraction

My understanding is that spark.memory.storageFraction controls the amount
of memory allocated for cached RDDs.  spark.memory.fraction controls how
much memory is allocated to Spark operations (task serialization,
operations, etc.), w/ the remainder reserved for user data structures,
Spark internal metadata, etc.  This includes the storage memory for cached
RDDs.

You end up with executor memory that looks like the following:
All memory: 0-100
Spark memory: 0-75
RDD Storage: 0-37
Other Spark: 38-75
Other Reserved: 76-100

Where do broadcast variables fall into the mix?

Regards,

Bryan Jeffrey


Re: Broadcasts & Storage Memory

2017-06-21 Thread satish lalam
My understanding is - it from storageFraction. Here cached blocks are
immune to eviction - so both persisted RDDs and broadcast variables sit
here. Ref



On Wed, Jun 21, 2017 at 1:43 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> Question: Do broadcast variables stored on executors count as part of
> 'storage memory' or other memory?
>
> A little bit more detail:
>
> I understand that we have two knobs to control memory allocation:
> - spark.memory.fraction
> - spark.memory.storageFraction
>
> My understanding is that spark.memory.storageFraction controls the amount
> of memory allocated for cached RDDs.  spark.memory.fraction controls how
> much memory is allocated to Spark operations (task serialization,
> operations, etc.), w/ the remainder reserved for user data structures,
> Spark internal metadata, etc.  This includes the storage memory for cached
> RDDs.
>
> You end up with executor memory that looks like the following:
> All memory: 0-100
> Spark memory: 0-75
> RDD Storage: 0-37
> Other Spark: 38-75
> Other Reserved: 76-100
>
> Where do broadcast variables fall into the mix?
>
> Regards,
>
> Bryan Jeffrey
>


Re: Broadcasts & Storage Memory

2017-06-21 Thread Bryan Jeffrey
Satish, 




I agree - that was my impression too. However I am seeing a smaller set of 
storage memory used on a given executor than the amount of memory required for 
my broadcast variables. I am wondering if the statistics in the ui are 
incorrect or if the broadcasts are simply not a part of that storage memory 
fraction. 




Bryan Jeffrey 




Get Outlook for Android







On Wed, Jun 21, 2017 at 6:48 PM -0400, "satish lalam"  
wrote:










My understanding is - it from storageFraction. Here cached blocks are immune to 
eviction - so both persisted RDDs and broadcast variables sit here. Ref 
On Wed, Jun 21, 2017 at 1:43 PM, Bryan Jeffrey  wrote:
Hello.
Question: Do broadcast variables stored on executors count as part of 'storage 
memory' or other memory?
A little bit more detail:
I understand that we have two knobs to control memory allocation:- 
spark.memory.fraction- spark.memory.storageFraction
My understanding is that spark.memory.storageFraction controls the amount of 
memory allocated for cached RDDs.  spark.memory.fraction controls how much 
memory is allocated to Spark operations (task serialization, operations, etc.), 
w/ the remainder reserved for user data structures, Spark internal metadata, 
etc.  This includes the storage memory for cached RDDs.

You end up with executor memory that looks like the following:All memory: 
0-100Spark memory: 0-75RDD Storage: 0-37Other Spark: 38-75Other Reserved: 76-100
Where do broadcast variables fall into the mix?
Regards,
Bryan Jeffrey









Using YARN w/o HDFS

2017-06-21 Thread Alaa Zubaidi (PDF)
Hi,

Can we run Spark on YARN with out installing HDFS?
If yes, where would HADOOP_CONF_DIR point to?

Regards,

-- 
*This message may contain confidential and privileged information. If it 
has been sent to you in error, please reply to advise the sender of the 
error and then immediately permanently delete it and all attachments to it 
from your systems. If you are not the intended recipient, do not read, 
copy, disclose or otherwise use this message or any attachments to it. The 
sender disclaims any liability for such unauthorized use. PLEASE NOTE that 
all incoming e-mails sent to PDF e-mail accounts will be archived and may 
be scanned by us and/or by external service providers to detect and prevent 
threats to our systems, investigate illegal or inappropriate behavior, 
and/or eliminate unsolicited promotional e-mails (“spam”). If you have any 
concerns about this process, please contact us at *
*legal.departm...@pdf.com* *.*


Re: Error while doing mvn release for spark 2.0.2 using scala 2.10

2017-06-21 Thread Kanagha Kumar
The problem I see is that the  and 
defined in profile - scala2.10 are not getting picked up by the submodules
while doing maven release - 3.3.9 version. It works correctly while doing
mvn package though.

I also changed pom.xml default properties to have 2.10 scala versions and
tried maven release.
Is it related to any* maven issue where properties are not getting
substituted correctly*? Any insights as to why is this occurring will be
very helpful.

Also, we tried mvn dependency:tree . For common/sketch, I see the following
output using nexus within my company.
*[WARNING] The POM for org.apache.spark:spark-tags_2.11:jar:2.0.2 is
missing, no dependency information available*

When I tried hardcoding properties defined within , it works
correctly.

*pom.xml:*



  scala-2.10

  

scala-2.10

  

  

2.10.6

2.10

${scala.version}

org.scala-lang

  

  


*common/sketch/pom.xml*





  org.apache.spark

  spark-tags_${scala.binary.version}



  

On Mon, Jun 19, 2017 at 2:25 PM, Kanagha Kumar 
wrote:

> Thanks. But, I am required to do a maven release to Nexus on spark 2.0.2
> built against scala 2.10.
> How can I go about with this? Is this a bug that I need to open in Spark
> jira?
>
> On Mon, Jun 19, 2017 at 12:12 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Some of projects (such as spark-tags) are Java projects. Spark doesn't
>> fix the artifact name and just hard-core 2.11.
>>
>> For your issue, try to use `install` rather than `package`.
>>
>> On Sat, Jun 17, 2017 at 7:20 PM, Kanagha Kumar 
>> wrote:
>>
>>> Hi,
>>>
>>> Bumping up again! Why does spark modules depend upon scala2.11 versions
>>> inspite of changing pom.xmls using ./dev/change-scala-version.sh 2.10.
>>> Appreciate any quick help!!
>>>
>>> Thanks
>>>
>>> On Fri, Jun 16, 2017 at 2:59 PM, Kanagha Kumar 
>>> wrote:
>>>
 Hey all,


 I'm trying to use Spark 2.0.2 with scala 2.10 by following this
 https://spark.apache.org/docs/2.0.2/building-spark.html
 #building-for-scala-210

 ./dev/change-scala-version.sh 2.10
 ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package


 I could build the distribution successfully using
 bash -xv dev/make-distribution.sh --tgz  -Dscala-2.10 -DskipTests

 But, when I am trying to maven release, it keeps failing with the error
 using the command:


 Executing Maven:  -B -f pom.xml  -DscmCommentPrefix=[maven-release-plugin]
 -e  -Dscala-2.10 -Pyarn -Phadoop-2.7 -Phadoop-provided -DskipTests
 -Dresume=false -U -X *release:prepare release:perform*

 Failed to execute goal on project spark-sketch_2.10: Could not resolve
 dependencies for project 
 org.apache.spark:spark-sketch_2.10:jar:2.0.2-sfdc-3.0.0:
 *Failure to find org.apache.spark:spark-tags_2.11:jar:2.0.2-sfdc-3.0.0*
 in  was cached in the local repository, resolution will
 not be reattempted until the update interval of nexus has elapsed or
 updates are forced -> [Help 1]


 Why does spark-sketch depend upon spark-tags_2.11 when I have already
 compiled against scala 2.10?? Any pointers would be helpful.
 Thanks
 Kanagha

>>>
>>>
>>
>


spark2.1 kafka0.10

2017-06-21 Thread lk_spark
hi,all: 
when I run stream application for a few minutes ,I got this error : 

17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned 
partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, 
weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, 
clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, 
profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, 
weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 
with generation 3
17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions 
[comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, 
bizs-4, pages-4, weibo-4, clicks-1, comment-1, clicks-3, weibocomment-4, 
weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, 
bizs-3, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time 
1498098896000 ms
java.lang.IllegalStateException: No current assignment for partition pages-2

I don't know why ?

2017-06-22


lk_spark 

Re: spark2.1 kafka0.10

2017-06-21 Thread lk_spark
java.lang.IllegalStateException: No current assignment for partition pages-2
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
 at scala.Option.orElse(Option.scala:289)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
 at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


2017-06-22 

lk_spark 



发件人:"lk_spark"
发送时间:2017-06-22 11:13
主题:spark2.1 kafka0.10
收件人:"user.spark"
抄送:

hi,all: 
when I run stream application for a few minutes ,I got this error : 

17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned 
partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, 
weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, 
clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, 
profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, 
weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 
with generation 3
17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions 
[comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, 
bizs-4, pages-4, weibo-4, clicks-1, comment-1, clicks-3, weibocomment-4, 
weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, 
bizs-3, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time 
1498098896000 ms
java.lang.IllegalStateException: No current assignment for partition pages-2

I don't know why ?

2017-06-22


lk_spark 

Re: spark2.1 kafka0.10

2017-06-21 Thread Pralabh Kumar
How many replicas ,you have for this topic .

On Thu, Jun 22, 2017 at 9:19 AM, lk_spark  wrote:

> java.lang.IllegalStateException: No current assignment for partition
> pages-2
>  at org.apache.kafka.clients.consumer.internals.SubscriptionState.
> assignedState(SubscriptionState.java:264)
>  at org.apache.kafka.clients.consumer.internals.SubscriptionState.
> needOffsetReset(SubscriptionState.java:336)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.
> seekToEnd(KafkaConsumer.java:1236)
>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
> latestOffsets(DirectKafkaInputDStream.scala:197)
>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:214)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>  at org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
>  at scala.Option.orElse(Option.scala:289)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(
> DStream.scala:330)
>  at org.apache.spark.streaming.dstream.ForEachDStream.
> generateJob(ForEachDStream.scala:48)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:117)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:116)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
>  at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at scala.collection.TraversableLike$class.flatMap(
> TraversableLike.scala:241)
>  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>  at org.apache.spark.streaming.DStreamGraph.generateJobs(
> DStreamGraph.scala:116)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:249)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:247)
>  at scala.util.Try$.apply(Try.scala:192)
>  at org.apache.spark.streaming.scheduler.JobGenerator.
> generateJobs(JobGenerator.scala:247)
>  at org.apache.spark.streaming.scheduler.JobGenerator.org$
> apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.
> scala:183)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:89)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:88)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> 2017-06-22
> --
> lk_spark
> --
>
> *发件人:*"lk_spark"
> *发送时间:*2017-06-22 11:13
> *主题:*spark2.1 kafka0.10
> *收件人:*"user.spark"
> *抄送:*
>
> hi,all:
> when I run stream application for a few minutes ,I got this error :
>
> 17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned
> partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1,
> weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4,
> weibo-4, clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4,
> weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0,
> bizs-0, bizs-3, pages-1, weibo-1, pages-3, clicks-2, weibo-3, clicks-4,
> comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1
> 17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
> 17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group
> youedata1 with generation 3
> 17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned
> partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1,
> weibocomment-0, bizs-2, bizs-4, pages-4, weibo-4, clicks-1, comment-1,
> clicks-3, weibocomment-4, weibocomment-2, profile-0, profile-2, profile-4,
> cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-3, clicks-2, weibo-3, clicks-4,
> comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1
> 17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time
> 1498098896000 ms
> java.lang.IllegalStateException: No current assignment for partition
> pages-2
>
> I don't know why ?
>
> 2017-06-22
> --
> lk_spark
>
>


Unsubscribe

2017-06-21 Thread Anita Tailor


Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using YARN w/o HDFS

2017-06-21 Thread Chen He
chang your fs.defaultFS to point to local file system and have a try

On Wed, Jun 21, 2017 at 4:50 PM, Alaa Zubaidi (PDF) 
wrote:

> Hi,
>
> Can we run Spark on YARN with out installing HDFS?
> If yes, where would HADOOP_CONF_DIR point to?
>
> Regards,
>
> *This message may contain confidential and privileged information. If it
> has been sent to you in error, please reply to advise the sender of the
> error and then immediately permanently delete it and all attachments to it
> from your systems. If you are not the intended recipient, do not read,
> copy, disclose or otherwise use this message or any attachments to it. The
> sender disclaims any liability for such unauthorized use. PLEASE NOTE that
> all incoming e-mails sent to PDF e-mail accounts will be archived and may
> be scanned by us and/or by external service providers to detect and prevent
> threats to our systems, investigate illegal or inappropriate behavior,
> and/or eliminate unsolicited promotional e-mails (“spam”). If you have any
> concerns about this process, please contact us at *
> *legal.departm...@pdf.com* *.*


Re: Re: spark2.1 kafka0.10

2017-06-21 Thread lk_spark
each topic have 5 partition  ,  2 replicas .

2017-06-22 

lk_spark 



发件人:Pralabh Kumar 
发送时间:2017-06-22 17:23
主题:Re: spark2.1 kafka0.10
收件人:"lk_spark"
抄送:"user.spark"

How many replicas ,you have for this topic . 


On Thu, Jun 22, 2017 at 9:19 AM, lk_spark  wrote:

java.lang.IllegalStateException: No current assignment for partition pages-2
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
 at scala.Option.orElse(Option.scala:289)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
 at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


2017-06-22 

lk_spark 



发件人:"lk_spark"
发送时间:2017-06-22 11:13
主题:spark2.1 kafka0.10
收件人:"user.spark"
抄送:

hi,all: 
when I run stream application for a few minutes ,I got this error : 

17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned 
partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, 
weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, 
clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, 
profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, 
weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 
with generation 3
17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions 
[comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, 
bizs-4, pages-4, weibo-4, clicks-1, comment-1, clicks-3, weibocomment-4, 
weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, 
bizs-3, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time 
1498098896000 ms
java.lang.IllegalStateException: No current assignment for partition pages-2

I don't know why ?

2017-06-22


lk_spark 

Re: Broadcasts & Storage Memory

2017-06-21 Thread Pralabh Kumar
Hi

Broadcast variables definitely store in the spark.memory.storageFraction .

1 If we go into the code of TorrentBroadcast.scala and writeBlocks method
and navigates to BlockManager to MemoryStore . Desearlization of the
variables occures in unroll memory and then transferred to storage memory .

memoryManager.synchronized {
  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)

val success = memoryManager.acquireStorageMemory(blockId, amount,
MemoryMode.ON_HEAP)


So definitely broadcast variables are stored in  spark.memory.storageFraction
.


Can u explain how are u seeing smaller set of memory used on given executor
for broadcast variables through UI ?

Regards
Pralabh Kumar

On Thu, Jun 22, 2017 at 4:39 AM, Bryan Jeffrey 
wrote:

> Satish,
>
> I agree - that was my impression too. However I am seeing a smaller set of
> storage memory used on a given executor than the amount of memory required
> for my broadcast variables. I am wondering if the statistics in the ui are
> incorrect or if the broadcasts are simply not a part of that storage memory
> fraction.
>
> Bryan Jeffrey
>
> Get Outlook for Android 
>
>
>
>
> On Wed, Jun 21, 2017 at 6:48 PM -0400, "satish lalam" <
> satish.la...@gmail.com> wrote:
>
> My understanding is - it from storageFraction. Here cached blocks are
>> immune to eviction - so both persisted RDDs and broadcast variables sit
>> here. Ref
>> 
>>
>>
>> On Wed, Jun 21, 2017 at 1:43 PM, Bryan Jeffrey 
>> wrote:
>>
>>> Hello.
>>>
>>> Question: Do broadcast variables stored on executors count as part of
>>> 'storage memory' or other memory?
>>>
>>> A little bit more detail:
>>>
>>> I understand that we have two knobs to control memory allocation:
>>> - spark.memory.fraction
>>> - spark.memory.storageFraction
>>>
>>> My understanding is that spark.memory.storageFraction controls the
>>> amount of memory allocated for cached RDDs.  spark.memory.fraction controls
>>> how much memory is allocated to Spark operations (task serialization,
>>> operations, etc.), w/ the remainder reserved for user data structures,
>>> Spark internal metadata, etc.  This includes the storage memory for cached
>>> RDDs.
>>>
>>> You end up with executor memory that looks like the following:
>>> All memory: 0-100
>>> Spark memory: 0-75
>>> RDD Storage: 0-37
>>> Other Spark: 38-75
>>> Other Reserved: 76-100
>>>
>>> Where do broadcast variables fall into the mix?
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>
>>


Re: Re: spark2.1 kafka0.10

2017-06-21 Thread Pralabh Kumar
It looks like your replicas for partition are getting failed. If u have
more brokers , can u try increasing ,replicas ,just to make sure atleast
one leader is always available.

On Thu, Jun 22, 2017 at 10:34 AM, lk_spark  wrote:

> each topic have 5 partition  ,  2 replicas .
>
> 2017-06-22
> --
> lk_spark
> --
>
> *发件人:*Pralabh Kumar 
> *发送时间:*2017-06-22 17:23
> *主题:*Re: spark2.1 kafka0.10
> *收件人:*"lk_spark"
> *抄送:*"user.spark"
>
> How many replicas ,you have for this topic .
>
> On Thu, Jun 22, 2017 at 9:19 AM, lk_spark  wrote:
>
>> java.lang.IllegalStateException: No current assignment for partition
>> pages-2
>>  at org.apache.kafka.clients.consumer.internals.SubscriptionStat
>> e.assignedState(SubscriptionState.java:264)
>>  at org.apache.kafka.clients.consumer.internals.SubscriptionStat
>> e.needOffsetReset(SubscriptionState.java:336)
>>  at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(
>> KafkaConsumer.java:1236)
>>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
>> latestOffsets(DirectKafkaInputDStream.scala:197)
>>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
>> compute(DirectKafkaInputDStream.scala:214)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>  at org.apache.spark.streaming.dstream.DStream.createRDDWithLoca
>> lProperties(DStream.scala:415)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1.apply(DStream.scala:335)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1.apply(DStream.scala:333)
>>  at scala.Option.orElse(Option.scala:289)
>>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStr
>> eam.scala:330)
>>  at org.apache.spark.streaming.dstream.ForEachDStream.generateJo
>> b(ForEachDStream.scala:48)
>>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DSt
>> reamGraph.scala:117)
>>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DSt
>> reamGraph.scala:116)
>>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:241)
>>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:241)
>>  at scala.collection.mutable.ResizableArray$class.foreach(Resiza
>> bleArray.scala:59)
>>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>  at scala.collection.TraversableLike$class.flatMap(TraversableLi
>> ke.scala:241)
>>  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>>  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStream
>> Graph.scala:116)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$
>> 3.apply(JobGenerator.scala:249)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$
>> 3.apply(JobGenerator.scala:247)
>>  at scala.util.Try$.apply(Try.scala:192)
>>  at org.apache.spark.streaming.scheduler.JobGenerator.generateJo
>> bs(JobGenerator.scala:247)
>>  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache
>> $spark$streaming$scheduler$JobGenerator$$processEvent(
>> JobGenerator.scala:183)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:89)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:88)
>>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> 2017-06-22
>> --
>> lk_spark
>> --
>>
>> *发件人:*"lk_spark"
>> *发送时间:*2017-06-22 11:13
>> *主题:*spark2.1 kafka0.10
>> *收件人:*"user.spark"
>> *抄送:*
>>
>> hi,all:
>> when I run stream application for a few minutes ,I got this error :
>>
>> 17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned
>> partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1,
>> weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4,
>> weibo-4, clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4,
>> weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0,
>> bizs-0, bizs-3, pages-1, weibo-1, pages-3, clicks-2, weibo-3, clicks-4,
>> comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1
>> 17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
>> 17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group
>> youedata1 with generation 3
>> 17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned
>> partitions [comment-0, profile-1, profile-3, cwb