Re: Data Duplication Bug Found - Structured Streaming Versions 3..4.1, 3.2.4, and 3.3.2

2023-09-18 Thread Jerry Peng
Hi Craig,

Thank you for sending us more information.  Can you answer my previous
question which I don't think the document addresses. How did you determine
duplicates in the output?  How was the output data read? The FileStreamSink
provides exactly-once writes ONLY if you read the output with the
FileStreamSource or the FileSource (batch).  A log is used to determine
what data is committed or not and those aforementioned sources know how to
use that log to read the data "exactly-once".  So there may be duplicated
data written on disk.  If you simply just read the data files written to
disk you may see duplicates when there are failures.  However, if you read
the output location with Spark you should get exactly once results (unless
there is a bug) since spark will know how to use the commit log to see what
data files are committed and not.

Best,

Jerry

On Mon, Sep 18, 2023 at 1:18 PM Craig Alfieri 
wrote:

> Hi Russell/Jerry/Mich,
>
>
>
> Appreciate your patience on this.
>
>
>
> Attached are more details on how this duplication “error” was found.
>
> Since we’re still unsure I am using “error” in quotes.
>
>
>
> We’d love the opportunity to work with any of you directly and/or the
> wider Spark community to triage this or get a better understanding of the
> nature of what we’re experiencing.
>
>
>
> Our platform provides the ability to fully reproduce this.
>
>
>
> Once you have had the chance to review the attached draft, let us know if
> there are any questions in the meantime. Again, we welcome the opportunity
> to work with the teams on this.
>
>
>
> Best-
>
> Craig
>
>
>
>
>
>
>
> *From: *Craig Alfieri 
> *Date: *Thursday, September 14, 2023 at 8:45 PM
> *To: *russell.spit...@gmail.com 
> *Cc: *Jerry Peng , Mich Talebzadeh <
> mich.talebza...@gmail.com>, user@spark.apache.org ,
> connor.mc...@antithesis.com 
> *Subject: *Re: Data Duplication Bug Found - Structured Streaming Versions
> 3..4.1, 3.2.4, and 3.3.2
>
> Hi Russell et al,
>
>
>
> Acknowledging receipt; we’ll get these answers back to the group.
>
>
>
> Follow-up forthcoming.
>
>
>
> Craig
>
>
>
>
>
>
>
> On Sep 14, 2023, at 6:38 PM, russell.spit...@gmail.com wrote:
>
> Exactly once should be output sink dependent, what sink was being used?
>
> Sent from my iPhone
>
>
>
> On Sep 14, 2023, at 4:52 PM, Jerry Peng 
> wrote:
>
> 
>
> Craig,
>
>
>
> Thanks! Please let us know the result!
>
>
>
> Best,
>
>
>
> Jerry
>
>
>
> On Thu, Sep 14, 2023 at 12:22 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
>
> Hi Craig,
>
>
>
> Can you please clarify what this bug is and provide sample code causing
> this issue?
>
>
>
> HTH
>
>
> Mich Talebzadeh,
>
> Distinguished Technologist, Solutions Architect & Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Image removed by sender.]  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 14 Sept 2023 at 17:48, Craig Alfieri 
> wrote:
>
> Hello Spark Community-
>
>
>
> As part of a research effort, our team here at Antithesis tests for
> correctness/fault tolerance of major OSS projects.
>
> Our team recently was testing Spark’s Structured Streaming, and we came
> across a data duplication bug we’d like to work with the teams on to
> resolve.
>
>
>
> Our intention is to utilize this as a future case study for our platform,
> but prior to doing so we like to have a resolution in place so that an
> announcement isn’t alarming to the user base.
>
>
>
> Attached is a high level .pdf that reviews the High Availability set-up
> put under test.
>
> This was also tested across the three latest versions, and the same
> behavior was observed.
>
>
>
> We can reproduce this error readily, since our environment is fully
> deterministic, we are just not Spark experts and would like to work with
> someone in the community to resolve this.
>
>
>
> Please let us know at your earliest convenience.
>
>
>
> Best
>
>
>
> Error! Filename not specified.
>

Re: Data Duplication Bug Found - Structured Streaming Versions 3..4.1, 3.2.4, and 3.3.2

2023-09-14 Thread Jerry Peng
Craig,

Thanks! Please let us know the result!

Best,

Jerry

On Thu, Sep 14, 2023 at 12:22 PM Mich Talebzadeh 
wrote:

>
> Hi Craig,
>
> Can you please clarify what this bug is and provide sample code causing
> this issue?
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 14 Sept 2023 at 17:48, Craig Alfieri 
> wrote:
>
>> Hello Spark Community-
>>
>>
>>
>> As part of a research effort, our team here at Antithesis tests for
>> correctness/fault tolerance of major OSS projects.
>>
>> Our team recently was testing Spark’s Structured Streaming, and we came
>> across a data duplication bug we’d like to work with the teams on to
>> resolve.
>>
>>
>>
>> Our intention is to utilize this as a future case study for our platform,
>> but prior to doing so we like to have a resolution in place so that an
>> announcement isn’t alarming to the user base.
>>
>>
>>
>> Attached is a high level .pdf that reviews the High Availability set-up
>> put under test.
>>
>> This was also tested across the three latest versions, and the same
>> behavior was observed.
>>
>>
>>
>> We can reproduce this error readily, since our environment is fully
>> deterministic, we are just not Spark experts and would like to work with
>> someone in the community to resolve this.
>>
>>
>>
>> Please let us know at your earliest convenience.
>>
>>
>>
>> Best
>>
>>
>>
>> *[image: signature_2327449931]*
>>
>> *Craig Alfieri*
>>
>> c: 917.841.1652
>>
>> craig.alfi...@antithesis.com
>>
>> New York, NY.
>>
>> Antithesis.com
>> 
>>
>>
>>
>> We can't talk about most of the bugs that we've found for our customers,
>>
>> but some customers like to speak about their work with us:
>>
>> https://github.com/mongodb/mongo/wiki/Testing-MongoDB-with-Antithesis
>>
>>
>>
>>
>>
>>
>> *-*
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity for whom they are
>> addressed. If you received this message in error, please notify the sender
>> and remove it from your system.*
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Unsubscribe

2023-05-01 Thread peng





Re: Potability of dockers built on different cloud platforms

2023-04-05 Thread Ken Peng




ashok34...@yahoo.com.INVALID wrote:
Is it possible to use Spark docker built on GCP on AWS without 
rebuilding from new on AWS?


I am using the spark image from bitnami for running on k8s.
And yes, it's deployed by helm.


--
https://kenpeng.pages.dev/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2023-01-20 Thread peng


unsubscribe



Re: A simple comparison for three SQL engines

2022-04-09 Thread Wes Peng

may I forward this report to spark list as well.

Thanks.

Wes Peng wrote:

Hello,

This weekend I made a test against a big dataset. spark, drill, mysql, 
postgresql were involved.


This is the final report:
https://blog.cloudcache.net/handles-the-file-larger-than-memory/

The simple conclusion:
1. spark is the fastest for this scale of data and limited memory
2. drill is close to spark
3. postgresql has surprising behavior in query speed
4. mysql is really slow

If you have found any issue please let me know.

Thanks

Wes Peng wrote:

sure.I will take time to do it.


Sanel Zukan wrote:

Any chance you can try with Postgres >= 12, default configuration with
the same indexed columns as with MySQL?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Executorlost failure

2022-04-07 Thread Wes Peng
I just did a test, even for a single node (local deployment), spark can 
handle the data whose size is much larger than the total memory.


My test VM (2g ram, 2 cores):

$ free -m
  totalusedfree  shared  buff/cache 
available
Mem:   19921845  92  19  54 
 36

Swap:  1023 285 738


The data size:

$ du -h rate.csv
3.2Grate.csv


Loading this file into spark for calculation can be done without error:

scala> val df = spark.read.format("csv").option("inferSchema", 
true).load("skydrive/rate.csv")
val df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 
more fields]


scala> df.printSchema
warning: 1 deprecation (since 2.13.3); for details, enable `:setting 
-deprecation` or `:replay -deprecation`

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: integer (nullable = true)


scala> 
df.groupBy("_c1").agg(avg("_c2").alias("avg_rating")).orderBy(desc("avg_rating")).show
warning: 1 deprecation (since 2.13.3); for details, enable `:setting 
-deprecation` or `:replay -deprecation`
+--+--+ 


|   _c1|avg_rating|
+--+--+
|000136|   5.0|
|0001711474|   5.0|
|0001360779|   5.0|
|0001006657|   5.0|
|0001361155|   5.0|
|0001018043|   5.0|
|000136118X|   5.0|
|202010|   5.0|
|0001371037|   5.0|
|401048|   5.0|
|0001371045|   5.0|
|0001203010|   5.0|
|0001381245|   5.0|
|0001048236|   5.0|
|0001436163|   5.0|
|000104897X|   5.0|
|0001437879|   5.0|
|0001056107|   5.0|
|0001468685|   5.0|
|0001061240|   5.0|
+--+--+
only showing top 20 rows


So as you see spark can handle file larger than its memory well. :)

Thanks


rajat kumar wrote:

With autoscaling can have any numbers of executors.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Executorlost failure

2022-04-07 Thread Wes Peng
I once had a file which is 100+GB getting computed in 3 nodes, each node 
has 24GB memory only. And the job could be done well. So from my 
experience spark cluster seems to work correctly for big files larger 
than memory by swapping them to disk.


Thanks

rajat kumar wrote:
Tested this with executors of size 5 cores, 17GB memory. Data vol is 
really high around 1TB


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Executorlost failure

2022-04-07 Thread Wes Peng

how many executors do you have?

rajat kumar wrote:
Tested this with executors of size 5 cores, 17GB memory. Data vol is 
really high around 1TB


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



query time comparison to several SQL engines

2022-04-07 Thread Wes Peng
I made a simple test to query time for several SQL engines including 
mysql, hive, drill and spark. The report,


https://cloudcache.net/data/query-time-mysql-hive-drill-spark.pdf

It maybe have no special meaning, just for fun. :)

regards.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Profiling spark application

2022-01-19 Thread Wes Peng

Give a look at this:
https://github.com/LucaCanali/sparkMeasure

On 2022/1/20 1:18, Prasad Bhalerao wrote:
Is there any way we can profile spark applications which will show no. 
of invocations of spark api and their execution time etc etc just the 
way jprofiler shows all the details?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Pyspark] How to download Zip file from SFTP location and put in into Azure Data Lake and unzip it

2022-01-18 Thread Wes Peng
How large is the file? From my experience, reading the excel file from 
data lake and loading as dataframe, works great.


Thanks

On 2022-01-18 22:16, Heta Desai wrote:

Hello,

 I have zip files on SFTP location. I want to download/copy those
files and put into Azure Data Lake. Once the zip files get stored into
Azure Data Lake, I want to unzip those files and read using Data
Frames.

 The file format inside zip is excel. SO, once files are unzipped, I
want to read excel files using spark DataFrames.

 Please help me with the solution as soon as possible.

 Thanks,

 ​Heta Desai | Data | Sr Associate L1
e.heta.de...@1rivet.com | t. +91 966.225.4954

 ​

 This email, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If you are not the intended recipient,
please advise the sender immediately and delete this message and any
attachments. Unless otherwise specifically stated in this email,
transaction related information in this email, including attachments,
is not to be construed as an offer, solicitation or the basis or
confirmation for any contract for the purchase/sale of any services.
All email sent to or from this address will be received by 1Rivet US,
Inc and is subject to archival retention and review by someone other
than the recipient.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ivy unit test case filing for Spark

2021-12-21 Thread Wes Peng
Are you using IvyVPN which causes this problem? If the VPN software changes
the network URL silently you should avoid using them.

Regards.

On Wed, Dec 22, 2021 at 1:48 AM Pralabh Kumar 
wrote:

> Hi Spark Team
>
> I am building a spark in VPN . But the unit test case below is failing.
> This is pointing to ivy location which  cannot be reached within VPN . Any
> help would be appreciated
>
> test("SPARK-33084: Add jar support Ivy URI -- default transitive = true")
> {
>   *sc *= new SparkContext(new 
> SparkConf().setAppName("test").setMaster("local-cluster[3,
> 1, 1024]"))
>   *sc*.addJar("*ivy://org.apache.hive:hive-storage-api:2.7.0*")
>   assert(*sc*.listJars().exists(_.contains(
> "org.apache.hive_hive-storage-api-2.7.0.jar")))
>   assert(*sc*.listJars().exists(_.contains(
> "commons-lang_commons-lang-2.6.jar")))
> }
>
> Error
>
> - SPARK-33084: Add jar support Ivy URI -- default transitive = true ***
> FAILED ***
> java.lang.RuntimeException: [unresolved dependency:
> org.apache.hive#hive-storage-api;2.7.0: not found]
> at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(
> SparkSubmit.scala:1447)
> at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(
> DependencyUtils.scala:185)
> at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(
> DependencyUtils.scala:159)
> at org.apache.spark.SparkContext.addJar(SparkContext.scala:1996)
> at org.apache.spark.SparkContext.addJar(SparkContext.scala:1928)
> at org.apache.spark.SparkContextSuite.$anonfun$new$115(SparkContextSuite.
> scala:1041)
> at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> at org.scalatest.Transformer.apply(Transformer.scala:22)
>
> Regards
> Pralabh Kumar
>
>
>


Re: [ANNOUNCE] Apache Spark 3.2.0

2021-10-19 Thread Henrik Peng
Congrats and thanks!


Gengliang Wang 于2021年10月19日 周二下午10:16写道:

> Hi all,
>
> Apache Spark 3.2.0 is the third release of the 3.x line. With tremendous
> contribution from the open-source community, this release managed to
> resolve in excess of 1,700 Jira tickets.
>
> We'd like to thank our contributors and users for their contributions and
> early feedback to this release. This release would not have been possible
> without you.
>
> To download Spark 3.2.0, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-2-0.html
>


Re: Spark Session error with 30s

2021-04-12 Thread Peng Lei
Hi KhajaAsmath Mohammed
  Please check the configuration of "spark.speculation.interval", just pass
the "30" to it.

 '''
  override def start(): Unit = {

  backend.start()

  if (!isLocal && conf.get(SPECULATION_ENABLED)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(
  () => Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() },
  SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  }
}

 '''


Sean Owen  于2021年4月13日周二 上午3:30写道:

> Something is passing this invalid 30s value, yes. Hard to say which
> property it is. I'd check if your cluster config sets anything with the
> value 30s - whatever is reading this property is not expecting it.
>
> On Mon, Apr 12, 2021, 2:25 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi Sean,
>>
>> Do you think anything that can cause this with DFS client?
>>
>> java.lang.NumberFormatException: For input string: "30s"
>> at
>> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>> at java.lang.Long.parseLong(Long.java:589)
>> at java.lang.Long.parseLong(Long.java:631)
>>
>>
>>
>> * at
>> org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1429)
>>   at
>> org.apache.hadoop.hdfs.client.impl.DfsClientConf.(DfsClientConf.java:247)
>>   at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:301)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:285)*
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
>> at
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2859)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2896)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2878)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:392)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:184)
>> at
>> org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
>> at
>> org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
>> at scala.Option.getOrElse(Option.scala:121)
>> at org.apache.spark.deploy.yarn.Client.(Client.scala:137)
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
>> at
>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:183)
>> at org.apache.spark.SparkContext.(SparkContext.scala:501)
>> at
>> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
>> at
>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:936)
>> at
>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession
>>
>> Thanks,
>> Asmath
>>
>> On Mon, Apr 12, 2021 at 2:20 PM KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> I am using spark hbase connector provided by hortonwokrs. I was able to
>>> run without issues in my local environment and has this issue in emr.
>>>
>>> Thanks,
>>> Asmath
>>>
>>> On Apr 12, 2021, at 2:15 PM, Sean Owen  wrote:
>>>
>>> 
>>> Somewhere you're passing a property that expects a number, but give it
>>> "30s". Is it a time property somewhere that really just wants MS or
>>> something? But most time properties (all?) in Spark should accept that type
>>> of input anyway. Really depends on what property has a problem and what is
>>> setting it.
>>>
>>> On Mon, Apr 12, 2021 at 1:56 PM KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
 HI,

 I am getting weird error when running spark job in emr cluster. Same
 program runs in my local machine. Is there anything that I need to do to
 resolve this?

 21/04/12 18:48:45 ERROR SparkContext: Error initializing SparkContext.
 java.lang.NumberFormatException: For input string: "30s"

 I tried the solution mentioned in the link below but it didn't work for
 me.


 https://hadooptutorials.info/2020/10/11/part-5-using-spark-as-execution-engine-for-hive-2/

 Thanks,
 Asmath

>>>


Question about how hadoop configurations populated in driver/executor pod

2021-03-22 Thread Yue Peng
Hi,

I am trying run sparkPi example via Spark on Kubernetes in my cluster. However, 
it is consistently  failing because of executor does not have the correct 
hadoop configurations. I could fix it by pre-creating a configmap and mounting 
it into executor by specifying in pod template. But I do see in the official 
doc that hadoop configuration will be serialized to executor pods.

Did I miss anything?

Error message in executor pod:
21/03/10 07:00:01 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.io.IOException: Incomplete HDFS URI, no host: 
hdfs:///tmp/spark-examples_2.12-3.0.125067.jar
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:170)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1853)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:737)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:522)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:871)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:862)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:862)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:406)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)

More information:

https://issues.apache.org/jira/browse/SPARK-34684

Thanks,
Yue



Re: Unsubscribe

2020-12-22 Thread Wesley Peng

Bhavya Jain wrote:

Unsubscribe


please send an email to: user-unsubscr...@spark.apache.org to 
unsubscribe yourself from the list. thanks.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: unsubscribe

2020-06-27 Thread Wesley Peng
please send an empty email to: user-unsubscr...@spark.apache.org to 
unsubscribe yourself from the list.



Sri Kris wrote:
Sent from Mail  for 
Windows 10




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ML] [How-to]: How to unload the loaded W2V model in Pyspark?

2020-02-17 Thread Zhefu PENG
Hi all,

I'm using pyspark and Spark-ml to train and use Word2Vect model, here is
the logic of my program:

model = Word2VecModel.load("save path")

result_list = model.findSynonymsArray(target, top_N)

Then I use the graphframe and result_list to create graph and do some
computing. However the program failed due to the out of memory error: xxx
is running beyond physical memory limits. As a result, I want to delete the
word2vec model to free memory, since I don't need to use it after getting
the result_list.

I tried using del function in Python, and
spark.sparkContext._gateway.detach(model._java_obj)
as
https://stackoverflow.com/questions/58759929/how-to-free-the-memory-taken-by-a-pyspark-model-javamodel
suggested.
But neither two worked.

Is there anyway to unload or delete the loaded w2v model in Spark or
Pyspark?

Really appreciate for any reply and help.

Best,
Zhefu


Re: [ANNOUNCE] Announcing Apache Spark 2.4.4

2019-09-01 Thread Wesley Peng




on 2019/9/2 5:54, Dongjoon Hyun wrote:

We are happy to announce the availability of Spark 2.4.4!

Spark 2.4.4 is a maintenance release containing stability fixes. This
release is based on the branch-2.4 maintenance branch of Spark. We strongly
recommend all 2.4 users to upgrade to this stable release.


That's awesome. thanks for the work.

regards.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to work around NoOffsetForPartitionException when using Spark Streaming

2018-06-01 Thread Martin Peng
Hi,

We see below exception when using Spark Kafka streaming 0.10 on a normal
Kafka topic. Not sure why offset missing in zk, but since Spark streaming
override the offset reset policy to none in the code. I can not set the
reset policy to latest(I don't really care data loss now).

Is there any quick way to fix the missing offset or work around this?

Thanks,
Martin

1/06/2018 17:11:02: ERROR:the type of error is
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined
offset with no reset policy for partition:
elasticsearchtopicrealtimereports-97
01/06/2018 17:11:02: ERROR:Undefined offset with no reset policy for
partition: elasticsearchtopicrealtimereports-97
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:370)
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:248)
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1601)
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:184)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.immutable.List.foreach(List.scala:381)
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
scala.collection.immutable.List.map(List.scala:285)
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstre

spark jdbc postgres query results don't match those of postgres query

2018-03-29 Thread Kevin Peng
I am running into a weird issue in Spark 1.6, which I was wondering if
anyone has encountered before. I am running a simple select query from
spark using a jdbc connection to postgres: val POSTGRES_DRIVER: String =
"org.postgresql.Driver" val srcSql = """select total_action_value,
last_updated from fb_fact_no_seg_20180123 where ad_id =
'23842688418150437'"" val r = sqlContext.read.format("jdbc").options(Map(
"url" -> jdbcUrl, "dbtable" -> s"($srcSql) as src" , "driver" ->
POSTGRES_DRIVER )).load().coalesce(1).cache() r.show
+--++ |total_action_value|
last_updated| +--++ |
2743.3301|2018-02-06 00:18:...| +--++
>From above you see that the result is 2743.3301, but when I run the same
query directly in postgres I get a slightly different answer: select
total_action_value, last_updated from fb_fact_no_seg_20180123 where ad_id =
'23842688418150437'; total_action_value | last_updated
+- 2743.33 | 2018-02-06 00:18:08 As
you can see from above the value is 2743.33. So why is the result coming
from spark off by .0001; basically where is .0001 coming from since in
postgres the decimal value is .33? Thanks, KP


Re: Spark Job crash due to File Not found when shuffle intermittently

2017-07-25 Thread Martin Peng
cool~ Thanks Kang! I will check and let you know.
Sorry for delay as there is an urgent customer issue today.

Best
Martin

2017-07-24 22:15 GMT-07:00 周康 :

> * If the file exists but is a directory rather than a regular file, does
> * not exist but cannot be created, or cannot be opened for any other
> * reason then a FileNotFoundException is thrown.
>
> After searching into FileOutputStream i saw this annotation.So you can check 
> executor node first(may be no permission or no space,or no enough file 
> descriptor)
>
>
> 2017-07-25 13:05 GMT+08:00 周康 :
>
>> You can also check whether space left in the executor node enough to
>> store shuffle file or not.
>>
>> 2017-07-25 13:01 GMT+08:00 周康 :
>>
>>> First,spark will handle task fail so if job ended normally , this error
>>> can be ignore.
>>> Second, when using BypassMergeSortShuffleWriter, it will first write
>>> data file then write an index file.
>>> You can check "Failed to delete temporary index file at" or "fail to
>>> rename file" in related executor node's log file.
>>>
>>> 2017-07-25 0:33 GMT+08:00 Martin Peng :
>>>
>>>> Is there anyone at share me some lights about this issue?
>>>>
>>>> Thanks
>>>> Martin
>>>>
>>>> 2017-07-21 18:58 GMT-07:00 Martin Peng :
>>>>
>>>>> Hi,
>>>>>
>>>>> I have several Spark jobs including both batch job and Stream jobs to
>>>>> process the system log and analyze them. We are using Kafka as the 
>>>>> pipeline
>>>>> to connect each jobs.
>>>>>
>>>>> Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some
>>>>> of the jobs(both batch or streaming) are thrown below exceptions
>>>>> randomly(either after several hours run or just run in 20 mins). Can 
>>>>> anyone
>>>>> give me some suggestions about how to figure out the real root cause?
>>>>> (Looks like google result is not very useful...)
>>>>>
>>>>> Thanks,
>>>>> Martin
>>>>>
>>>>> 00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task
>>>>> 60.0 in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
>>>>> java.io.FileNotFoundException: /mnt/mesos/work_dir/slaves/201
>>>>> 60924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a
>>>>> -4df9-b034-8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-
>>>>> 4f37-a106-27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-c356
>>>>> 43e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
>>>>> (No such file or directory)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native
>>>>> Method)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.open(
>>>>> FileOutputStream.java:270)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.>>>> >(FileOutputStream.java:213)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.>>>> >(FileOutputStream.java:162)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.shuffle.Index
>>>>> ShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlo
>>>>> ckResolver.scala:144)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.shuffle.sort.
>>>>> BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWri
>>>>> ter.java:128)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.scheduler.Shu
>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:96)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.scheduler.Shu
>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:53)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.scheduler.Tas
>>>>> k.run(Task.scala:99)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.executor.Exec
>>>>> utor$TaskRunner.run(Executor.scala:282)
>>>>> 00:30:04,510 WARN  - at java.util.concurrent.ThreadPoo
>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> 00:30:04,510 WARN  - at java.util.concurrent.ThreadPoo
>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> 00:30:04,510 WARN  - at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> 00:30:04,580 INFO  - Driver stacktrace:
>>>>> 00:30:04,580 INFO  - org.apa

Re: Spark Job crash due to File Not found when shuffle intermittently

2017-07-24 Thread Martin Peng
Is there anyone at share me some lights about this issue?

Thanks
Martin

2017-07-21 18:58 GMT-07:00 Martin Peng :

> Hi,
>
> I have several Spark jobs including both batch job and Stream jobs to
> process the system log and analyze them. We are using Kafka as the pipeline
> to connect each jobs.
>
> Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some of
> the jobs(both batch or streaming) are thrown below exceptions
> randomly(either after several hours run or just run in 20 mins). Can anyone
> give me some suggestions about how to figure out the real root cause?
> (Looks like google result is not very useful...)
>
> Thanks,
> Martin
>
> 00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task 60.0
> in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
> java.io.FileNotFoundException: /mnt/mesos/work_dir/slaves/
> 20160924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a-4df9-b034-
> 8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-4f37-a106-
> 27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-
> c35643e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
> (No such file or directory)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native Method)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.open(
> FileOutputStream.java:270)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.<
> init>(FileOutputStream.java:213)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.<
> init>(FileOutputStream.java:162)
> 00:30:04,510 WARN  - at org.apache.spark.shuffle.
> IndexShuffleBlockResolver.writeIndexFileAndCommit(
> IndexShuffleBlockResolver.scala:144)
> 00:30:04,510 WARN  - at org.apache.spark.shuffle.sort.
> BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128)
> 00:30:04,510 WARN  - at org.apache.spark.scheduler.
> ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> 00:30:04,510 WARN  - at org.apache.spark.scheduler.
> ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> 00:30:04,510 WARN  - at org.apache.spark.scheduler.
> Task.run(Task.scala:99)
> 00:30:04,510 WARN  - at org.apache.spark.executor.
> Executor$TaskRunner.run(Executor.scala:282)
> 00:30:04,510 WARN  - at java.util.concurrent.
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 00:30:04,510 WARN  - at java.util.concurrent.
> ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 00:30:04,510 WARN  - at java.lang.Thread.run(Thread.java:748)
>
> 00:30:04,580 INFO  - Driver stacktrace:
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1435)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:1423)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:1422)
> 00:30:04,580 INFO  - scala.collection.mutable.
> ResizableArray$class.foreach(ResizableArray.scala:59)
> 00:30:04,580 INFO  - scala.collection.mutable.ArrayBuffer.foreach(
> ArrayBuffer.scala:48)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1422)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> 00:30:04,580 INFO  - scala.Option.foreach(Option.scala:257)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.
> handleTaskSetFailed(DAGScheduler.scala:802)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.
> DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.
> DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.
> DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
> 00:30:04,580 INFO  - org.apache.spark.util.EventLoop$$anon$1.run(
> EventLoop.scala:48)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.runJob(
> DAGScheduler.scala:628)
> 00:30:04,580 INFO  - org.apache.spark.SparkContext.
> runJob(SparkContext.scala:1918)
> 00:30:04,580 INFO  - org.apache.spark.SparkContext.
> runJob(SparkContext.scala:1931)
> 00:30:04,580 INFO  - org.apache.spark.SparkContext.
> runJob(SparkContext.scala:1944)
> 00:30:04,580 INFO  - org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.
> scala:1353)
> 00:30:04,580 INFO  - org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> 00:30:04,580 INFO  - org.apache.spark.rdd.

Spark Job crash due to File Not found when shuffle intermittently

2017-07-21 Thread Martin Peng
Hi,

I have several Spark jobs including both batch job and Stream jobs to
process the system log and analyze them. We are using Kafka as the pipeline
to connect each jobs.

Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some of
the jobs(both batch or streaming) are thrown below exceptions
randomly(either after several hours run or just run in 20 mins). Can anyone
give me some suggestions about how to figure out the real root cause?
(Looks like google result is not very useful...)

Thanks,
Martin

00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task 60.0
in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
java.io.FileNotFoundException:
/mnt/mesos/work_dir/slaves/20160924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a-4df9-b034-8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-4f37-a106-27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-c35643e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
(No such file or directory)
00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native Method)
00:30:04,510 WARN  - at
java.io.FileOutputStream.open(FileOutputStream.java:270)
00:30:04,510 WARN  - at
java.io.FileOutputStream.(FileOutputStream.java:213)
00:30:04,510 WARN  - at
java.io.FileOutputStream.(FileOutputStream.java:162)
00:30:04,510 WARN  - at
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
00:30:04,510 WARN  - at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.Task.run(Task.scala:99)
00:30:04,510 WARN  - at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
00:30:04,510 WARN  - at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
00:30:04,510 WARN  - at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
00:30:04,510 WARN  - at java.lang.Thread.run(Thread.java:748)

00:30:04,580 INFO  - Driver stacktrace:
00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
00:30:04,580 INFO  -
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
00:30:04,580 INFO  -
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
00:30:04,580 INFO  - scala.Option.foreach(Option.scala:257)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
00:30:04,580 INFO  -
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
00:30:04,580 INFO  - org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
00:30:04,580 INFO  - org.apache.spark.rdd.RDD.take(RDD.scala:1326)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(

The stability of Spark Stream Kafka 010

2017-06-29 Thread Martin Peng
Hi,

We planned to upgrade our Spark Kafka library to 0.10 from 0.81 to simplify
our infrastructure code logic. Does anybody know when will the 010 version
become stable from experimental?
May I use this 010 version together with Spark 1.5.1?

https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

Thanks
Martin


Re: Setting Optimal Number of Spark Executor Instances

2017-03-15 Thread Kevin Peng
Mohini,

We set that parameter before we went and played with the number of
executors and that didn't seem to help at all.

Thanks,

KP

On Tue, Mar 14, 2017 at 3:37 PM, mohini kalamkar 
wrote:

> Hi,
>
> try using this parameter --conf spark.sql.shuffle.partitions=1000
>
> Thanks,
> Mohini
>
> On Tue, Mar 14, 2017 at 3:30 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I am currently on Spark 1.6 and I was doing a sql join on two tables that
>> are over 100 million rows each and I noticed that it was spawn 3+
>> tasks
>> (this is the progress meter that we are seeing show up).  We tried to
>> coalesece, repartition and shuffle partitions to drop the number of tasks
>> down because we were getting time outs due to the number of task being
>> spawned, but those operations did not seem to reduce the number of tasks.
>> The solution we came up with was actually to set the num executors to 50
>> (--num-executors=50) and it looks like it spawned 200 active tasks, but
>> the
>> total number of tasks remained the same.  Was wondering if anyone knows
>> what
>> is going on?  Is there an optimal number of executors, I was under the
>> impression that the default dynamic allocation would pick the optimal
>> number
>> of executors for us and that this situation wouldn't happen.  Is there
>> something I am missing?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Setting-Optimal-Number-of-Spark-Execut
>> or-Instances-tp28493.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Thanks & Regards,
> Mohini Kalamkar
> M: +1 310 567 9329 <(310)%20567-9329>
>


Re: udf of aggregation in pyspark dataframe ?

2016-09-29 Thread peng yu
df:  
-
a|b|c
---
1|m|n
1|x | j
2|m|x
...


import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType

def my_zip(c, d):
return dict(zip(c, d))

my_zip = F.udf(_my_zip, MapType(StingType(), StringType(), True), True)

df.groupBy('a').agg(my_zip(collect_list('c'),
collect_list('d')).alias('named_list'))



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27814.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: udf of aggregation in pyspark dataframe ?

2016-09-29 Thread peng yu
btw, i am using spark 1.6.1



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



udf of aggregation in pyspark dataframe ?

2016-09-29 Thread peng yu
Hi, 

is there a way to write a udf in pyspark support agg()? 


i search all over the docs and internet, and tested it out.. some say yes,
some say no.

and when i try those yes code examples, just complaint about

AnalysisException: u"expression 'pythonUDF' is neither present in the group
by, nor is it an aggregate function. Add to group by or wrap in first() (or
first_value) if you don't care which value you get.;"



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Mike,

It looks like you are right.  The result seem to be fine.  It looks like I
messed up on the filtering clause.

sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE (s.date >= '2016-01-03' OR s.date IS NULL) AND (d.date >=
'2016-01-03' OR d.date IS NULL)").count()
res2: Long = 53042

Davies, Cesar, Gourav,

Thanks for the support.

KP

On Tue, May 3, 2016 at 11:26 AM, Michael Segel 
wrote:

> Silly question?
>
> If you change the predicate to
> ( s.date >= ‘2016-01-03’ OR s.date IS NULL )
> AND
> (d.date >= ‘2016-01-03’ OR d.date IS NULL)
>
> What do you get?
>
> Sorry if the syntax isn’t 100% correct. The idea is to not drop null
> values from the query.
> I would imagine that this shouldn’t kill performance since its most likely
> a post join filter on the result set?
> (Or is that just a Hive thing?)
>
> -Mike
>
> > On May 3, 2016, at 12:42 PM, Davies Liu  wrote:
> >
> > Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
> > '2016-01-03' is the root cause,
> > which will filter out all the nulls from outer join, will have same
> > result as inner join.
> >
> > In Spark 2.0, we turn these join into inner join actually.
> >
> > On Tue, May 3, 2016 at 9:50 AM, Cesar Flores  wrote:
> >> Hi
> >>
> >> Have you tried the joins without the where clause? When you use them
> you are
> >> filtering all the rows with null columns in those fields. In other
> words you
> >> are doing a inner join in all your queries.
> >>
> >> On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> >> wrote:
> >>>
> >>> Hi Kevin,
> >>>
> >>> Having given it a first look I do think that you have hit something
> here
> >>> and this does not look quite fine. I have to work on the multiple AND
> >>> conditions in ON and see whether that is causing any issues.
> >>>
> >>> Regards,
> >>> Gourav Sengupta
> >>>
> >>> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:
> >>>>
> >>>> Davies,
> >>>>
> >>>> Here is the code that I am typing into the spark-shell along with the
> >>>> results (my question is at the bottom):
> >>>>
> >>>> val dps =
> >>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>>> "true").load("file:///home/ltu/dps_csv/")
> >>>> val swig =
> >>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>>> "true").load("file:///home/ltu/swig_csv/")
> >>>>
> >>>> dps.count
> >>>> res0: Long = 42694
> >>>>
> >>>> swig.count
> >>>> res1: Long = 42034
> >>>>
> >>>>
> >>>> dps.registerTempTable("dps_pin_promo_lt")
> >>>> swig.registerTempTable("swig_pin_promo_lt")
> >>>>
> >>>> sqlContext.sql("select * from dps_pin_promo_lt where date >
> >>>> '2016-01-03'").count
> >>>> res4: Long = 42666
> >>>>
> >>>> sqlContext.sql("select * from swig_pin_promo_lt where date >
> >>>> '2016-01-03'").count
> >>>> res5: Long = 34131
> >>>>
> >>>> sqlContext.sql("select distinct date, account, ad from
> dps_pin_promo_lt
> >>>> where date > '2016-01-03'").count
> >>>> res6: Long = 42533
> >>>>
> >>>> sqlContext.sql("select distinct date, account, ad from
> swig_pin_promo_lt
> >>>> where date > '2016-01-03'").count
> >>>> res7: Long = 34131
> >>>>
> >>>>
> >>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
> >>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> >>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
> >>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
> s.ad =
> >>&

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Davies,

What exactly do you mean in regards to Spark 2.0 turning these join into an
inner join?  Does this mean that spark sql won't be supporting where
clauses in outer joins?


Cesar & Gourav,

When running the queries without the where clause it works as expected.  I
am pasting my results below:
val dps =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/dps_csv/")
val swig =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/swig_csv/")

dps.count
res0: Long = 42694

swig.count
res1: Long = 42034


dps.registerTempTable("dps_pin_promo_lt")
swig.registerTempTable("swig_pin_promo_lt")


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res5: Long = 60919


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res6: Long = 42034


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res7: Long = 42694

Thanks,

KP


On Tue, May 3, 2016 at 10:42 AM, Davies Liu  wrote:

> Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
> '2016-01-03' is the root cause,
>  which will filter out all the nulls from outer join, will have same
> result as inner join.
>
> In Spark 2.0, we turn these join into inner join actually.
>
> On Tue, May 3, 2016 at 9:50 AM, Cesar Flores  wrote:
> > Hi
> >
> > Have you tried the joins without the where clause? When you use them you
> are
> > filtering all the rows with null columns in those fields. In other words
> you
> > are doing a inner join in all your queries.
> >
> > On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> > wrote:
> >>
> >> Hi Kevin,
> >>
> >> Having given it a first look I do think that you have hit something here
> >> and this does not look quite fine. I have to work on the multiple AND
> >> conditions in ON and see whether that is causing any issues.
> >>
> >> Regards,
> >> Gourav Sengupta
> >>
> >> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:
> >>>
> >>> Davies,
> >>>
> >>> Here is the code that I am typing into the spark-shell along with the
> >>> results (my question is at the bottom):
> >>>
> >>> val dps =
> >>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>> "true").load("file:///home/ltu/dps_csv/")
> >>> val swig =
> >>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>> "true").load("file:///home/ltu/swig_csv/")
> >>>
> >>> dps.count
> >>> res0: Long = 42694
> >>>
> >>> swig.count
> >>> res1: Long = 42034
> >>>
> >>>
> >>> dps.registerTempTable("dps_pin_promo_lt")
> >>> swig.registerTempTable("swig_pin_promo_lt")
> >>>
> >>> sqlContext.sql("select * from dps_pin_promo_lt where date >
> >>> '2016-01-03'").count
> >>> res4: Long = 42666
> >>>
> >>> sqlContext.sql("select * from swig_pin_promo_lt where date >
> >>> '2016-01-03'").count
> >>> res5: Long = 34131
> >>>
> >>> sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
> >>> where date > '2016-01-03'").count
> >>> res6: Long = 42533
> >>>
> >>> sqlContext.sql("select distinct date, account, ad from
> swig_pin_promo_lt
> >>> where date > '2016-01-03'").count
> >>> res7: Long = 34131
> >>>
> >>>
> >>> sqlContext.sql("SELECT s.date AS edat

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Davies,

Here is the code that I am typing into the spark-shell along with the
results (my question is at the bottom):

val dps =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/dps_csv/")
val swig =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/swig_csv/")

dps.count
res0: Long = 42694

swig.count
res1: Long = 42034


dps.registerTempTable("dps_pin_promo_lt")
swig.registerTempTable("swig_pin_promo_lt")

sqlContext.sql("select * from dps_pin_promo_lt where date >
'2016-01-03'").count
res4: Long = 42666

sqlContext.sql("select * from swig_pin_promo_lt where date >
'2016-01-03'").count
res5: Long = 34131

sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
where date > '2016-01-03'").count
res6: Long = 42533

sqlContext.sql("select distinct date, account, ad from swig_pin_promo_lt
where date > '2016-01-03'").count
res7: Long = 34131


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res9: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res10: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res11: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res12: Long = 23809



>From my results above, we notice that the counts of distinct values based
on the join criteria and filter criteria for each individual table is
located at res6 and res7.  My question is why is the outer join producing
less rows than the smallest table; if there are no matches it should still
bring in that row as part of the outer join.  For the full and right outer
join I am expecting to see a minimum of res6 rows, but I get less, is there
something specific that I am missing here?  I am expecting that the full
outer join would give me the union of the two table sets so I am expecting
at least 42533 rows not 23809.


Gourav,

I just ran this result set on a new session with slightly newer data...
still seeing those results.



Thanks,

KP


On Mon, May 2, 2016 at 11:16 PM, Davies Liu  wrote:

> as @Gourav said, all the join with different join type show the same
> results,
> which meant that all the rows from left could match at least one row from
> right,
> all the rows from right could match at least one row from left, even
> the number of row from left does not equal that of right.
>
> This is correct result.
>
> On Mon, May 2, 2016 at 2:13 PM, Kevin Peng  wrote:
> > Yong,
> >
> > Sorry, let explain my deduction; it is going be difficult to get a sample
> > data out since the dataset I am using is proprietary.
> >
> > From the above set queries (ones mentioned in above comments) both inner
> and
> > outer join are producing the same counts.  They are basically pulling out
> > selected columns from the query, but there is no roll up happening or
> > anything that would possible make it suspicious that there is any
> difference
> > besides the type of joins.  The tables are matched based on three keys
> that
> > are present in both tables (ad, account, and date), on top of this they
> are
> > filtered by date being above 2016-01-03.  Since all the joins are
> producing
> > the same counts, the natural suspicions is that the tables are identical,
> > but I when I run the following two queries:
> >
&g

Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread Kevin Peng
Yong,

Sorry, let explain my deduction; it is going be difficult to get a sample
data out since the dataset I am using is proprietary.

>From the above set queries (ones mentioned in above comments) both inner
and outer join are producing the same counts.  They are basically pulling
out selected columns from the query, but there is no roll up happening or
anything that would possible make it suspicious that there is any
difference besides the type of joins.  The tables are matched based on
three keys that are present in both tables (ad, account, and date), on top
of this they are filtered by date being above 2016-01-03.  Since all the
joins are producing the same counts, the natural suspicions is that the
tables are identical, but I when I run the following two queries:

scala> sqlContext.sql("select * from swig_pin_promo_lt where date
>='2016-01-03'").count

res14: Long = 34158

scala> sqlContext.sql("select * from dps_pin_promo_lt where date
>='2016-01-03'").count

res15: Long = 42693


The above two queries filter out the data based on date used by the joins
of 2016-01-03 and you can see the row count between the two tables are
different, which is why I am suspecting something is wrong with the outer
joins in spark sql, because in this situation the right and outer joins may
produce the same results, but it should not be equal to the left join and
definitely not the inner join; unless I am missing something.


Side note: In my haste response above I posted the wrong counts for
dps.count, the real value is res16: Long = 42694


Thanks,


KP



On Mon, May 2, 2016 at 12:50 PM, Yong Zhang  wrote:

> We are still not sure what is the problem, if you cannot show us with some
> example data.
>
> For dps with 42632 rows, and swig with 42034 rows, if dps full outer join
> with swig on 3 columns; with additional filters, get the same resultSet row
> count as dps lefter outer join with swig on 3 columns, with additional
> filters, again get the the same resultSet row count as dps right outer join
> with swig on 3 columns, with same additional filters.
>
> Without knowing your data, I cannot see the reason that has to be a bug in
> the spark.
>
> Am I misunderstanding your bug?
>
> Yong
>
> --
> From: kpe...@gmail.com
> Date: Mon, 2 May 2016 12:11:18 -0700
> Subject: Re: Weird results with Spark SQL Outer joins
> To: gourav.sengu...@gmail.com
> CC: user@spark.apache.org
>
>
> Gourav,
>
> I wish that was case, but I have done a select count on each of the two
> tables individually and they return back different number of rows:
>
>
> dps.registerTempTable("dps_pin_promo_lt")
> swig.registerTempTable("swig_pin_promo_lt")
>
>
> dps.count()
> RESULT: 42632
>
>
> swig.count()
> RESULT: 42034
>
> On Mon, May 2, 2016 at 11:55 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> This shows that both the tables have matching records and no mismatches.
> Therefore obviously you have the same results irrespective of whether you
> use right or left join.
>
> I think that there is no problem here, unless I am missing something.
>
> Regards,
> Gourav
>
> On Mon, May 2, 2016 at 7:48 PM, kpeng1  wrote:
>
> Also, the results of the inner query produced the same results:
> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
> AS
> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
> =
> d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count()
> RESULT:23747
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread Kevin Peng
Gourav,

I wish that was case, but I have done a select count on each of the two
tables individually and they return back different number of rows:


dps.registerTempTable("dps_pin_promo_lt")

swig.registerTempTable("swig_pin_promo_lt")


dps.count()

RESULT: 42632


swig.count()

RESULT: 42034

On Mon, May 2, 2016 at 11:55 AM, Gourav Sengupta 
wrote:

> This shows that both the tables have matching records and no mismatches.
> Therefore obviously you have the same results irrespective of whether you
> use right or left join.
>
> I think that there is no problem here, unless I am missing something.
>
> Regards,
> Gourav
>
> On Mon, May 2, 2016 at 7:48 PM, kpeng1  wrote:
>
>> Also, the results of the inner query produced the same results:
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT:23747
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread Kevin Peng
Gourav,

Apologies.  I edited my post with this information:
Spark version: 1.6
Result from spark shell
OS: Linux version 2.6.32-431.20.3.el6.x86_64 (
mockbu...@c6b9.bsys.dev.centos.org) (gcc version 4.4.7 20120313 (Red Hat
4.4.7-4) (GCC) ) #1 SMP Thu Jun 19 21:14:45 UTC 2014

Thanks,

KP

On Mon, May 2, 2016 at 11:05 AM, Gourav Sengupta 
wrote:

> Hi,
>
> As always, can you please write down details regarding your SPARK cluster
> - the version, OS, IDE used, etc?
>
> Regards,
> Gourav Sengupta
>
> On Mon, May 2, 2016 at 5:58 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I am running into a weird result with Spark SQL Outer joins.  The results
>> for all of them seem to be the same, which does not make sense due to the
>> data.  Here are the queries that I am running with the results:
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT:23747
>>
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT:23747
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT: 23747
>>
>> Was wondering if someone had encountered this issues before.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: println not appearing in libraries when running job using spark-submit --master local

2016-03-28 Thread Kevin Peng
Ted,

What triggerAndWait does is perform a rest call to a specified url and then
waits until the status message that gets returned by that url in a json a
field says complete.  The issues is I put a println at the very top of the
method and that doesn't get printed out, and I know that println isn't
causing an issues because there is an exception that I throw further down
the line and that exception is what I am currently getting, but none of the
println along the way are showing:


  def triggerAndWait(url: String, pollInterval: Int = 1000 * 30,

timeOut: Int = 1000 * 60 * 60, connectTimeout: Int = 3,

readTimeout: Int = 3, requestMethod: String = "GET"): Boolean = {

println("Entering triggerAndWait function - url: " + url +

  " pollInterval: " + pollInterval.toString() + " timeOut: " +

  timeOut.toString() + " connectionTimeout: " +

  connectTimeout.toString() + " readTimeout: " + readTimeout.toString()
+

  " requestMethod: " + requestMethod)


.


Thanks,


KP

On Mon, Mar 28, 2016 at 1:52 PM, Ted Yu  wrote:

> Can you describe what gets triggered by triggerAndWait ?
>
> Cheers
>
> On Mon, Mar 28, 2016 at 1:39 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I am currently trying to debug a spark application written in scala.  I
>> have
>> a main method:
>>   def main(args: Array[String]) {
>> ...
>>  SocialUtil.triggerAndWait(triggerUrl)
>> ...
>>
>> The SocialUtil object is included in a seperate jar.  I launched the
>> spark-submit command using --jars passing the SocialUtil jar.  Inside the
>> triggerAndWait function I have a println statement that is the first thing
>> in the method, but it doesn't seem to be coming out.  All println that
>> happen inside the main function directly are appearing though.  I was
>> wondering if anyone knows what is going on in this situation and how I can
>> go about making the println in the SocialUtil object appear.
>>
>> Thanks,
>>
>> KP
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/println-not-appearing-in-libraries-when-running-job-using-spark-submit-master-local-tp26617.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala

2015-08-21 Thread Emma Boya Peng
Hi,

I was trying to programmatically specify a schema and apply it to a RDD of
Rows and save the resulting DataFrame as a parquet file, but I got
"java.lang.ClassCastException:
java.lang.String cannot be cast to java.lang.Long" on the last step.

Here's what I did:

1. Created an RDD of Rows from RDD[Array[String]]:
val gameId= Long.valueOf(line(0))
  val accountType = Long.valueOf(line(1))
  val worldId = Long.valueOf(line(2))
  val dtEventTime = line(3)
  val iEventId = line(4)

  return Row(gameId, accountType, worldId, dtEventTime, iEventId)

2. Generate the schema:
 return StructType(Array(StructField("GameId", LongType, true),
StructField("AccountType", LongType, true), StructField("WorldId",
LongType, true), StructField("dtEventTime", StringType, true),
StructField("iEventId",
StringType, true)))

3. Apply the schema and apply it to the RDD of Rows:
val schemaRdd = sqlContext.createDataFrame(rowRdd, schema)

4. Save schemaRdd as a parquet file:
 schemaRdd.saveAsParquetFile(dst + "/" + tableName + ".parquet")

However, it gave me a ClassCastException on step 4 (the DataFrame, i.e.
schemaRdd, can be correctly printed out according to the specified schema).

Thank you for your help!

Best,
Emma

Stack trace of the exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage
1.0 (TID 12, 10-4-28-24): java.lang.ClassCastException: java.lang.String
cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at
org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:357)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:338)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:324)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at org.apache.spark.sql.parquet.ParquetRelation2.org
$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala

2015-08-21 Thread Emma Boya Peng
Hi,

I was trying to programmatically specify a schema and apply it to a RDD of
Rows and save the resulting DataFrame as a parquet file.
Here's what I did:

1. Created an RDD of Rows from RDD[Array[String]]:
val gameId= Long.valueOf(line(0))
  val accountType = Long.valueOf(line(1))
  val worldId = Long.valueOf(line(2))
  val dtEventTime = line(3)
  val iEventId = line(4)
  val vVersionId = line(5)
  val vUin = line(6)
  val vClientIp = line(7)
  val vZoneId = line(8)
  val dtCreateTime = line(9)
  val iFeeFlag = Long.valueOf(line(10))
  val vLoginWay = line(11)

  return Row(gameId, accountType, worldId, dtEventTime, iEventId,
vVersionId, vUin, vClientIp,
 vZoneId, dtCreateTime, vZoneId, dtCreateTime, iFeeFlag,
vLoginWay)


Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Thanks all for your information. Andrew, I dig out one of your old post
which is relevant:

http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-td5798.html

But didn't mention how to supply the properties that don't start with spark.

On 12 June 2015 at 19:39, Ted Yu  wrote:

> This is the SPARK JIRA which introduced the warning:
>
> [SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties
> in spark-shell and spark-submit
>
> On Fri, Jun 12, 2015 at 4:34 PM, Peng Cheng  wrote:
>
>> Hi Andrew,
>>
>> Thanks a lot! Indeed, it doesn't start with spark, the following
>> properties are read by implementation of the driver rather than spark conf:
>>
>> --conf spooky.root=s3n://spooky- \
>> --conf spooky.checkpoint=s3://spooky-checkpoint \
>>
>> This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to
>> set the same properties?
>>
>> Yours Peng
>>
>> On 12 June 2015 at 14:20, Andrew Or  wrote:
>>
>>> Hi Peng,
>>>
>>> Setting properties through --conf should still work in Spark 1.4. From
>>> the warning it looks like the config you are trying to set does not start
>>> with the prefix "spark.". What is the config that you are trying to set?
>>>
>>> -Andrew
>>>
>>> 2015-06-12 11:17 GMT-07:00 Peng Cheng :
>>>
>>>> In Spark <1.3.x, the system property of the driver can be set by --conf
>>>> option, shared between setting spark properties and system properties.
>>>>
>>>> In Spark 1.4.0 this feature is removed, the driver instead log the
>>>> following
>>>> warning:
>>>>
>>>> Warning: Ignoring non-spark config property: xxx.xxx=v
>>>>
>>>> How do set driver's system property in 1.4.0? Is there a reason it is
>>>> removed without a deprecation warning?
>>>>
>>>> Thanks a lot for your advices.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Hi Andrew,

Thanks a lot! Indeed, it doesn't start with spark, the following properties
are read by implementation of the driver rather than spark conf:

--conf spooky.root=s3n://spooky- \
--conf spooky.checkpoint=s3://spooky-checkpoint \

This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to set
the same properties?

Yours Peng

On 12 June 2015 at 14:20, Andrew Or  wrote:

> Hi Peng,
>
> Setting properties through --conf should still work in Spark 1.4. From the
> warning it looks like the config you are trying to set does not start with
> the prefix "spark.". What is the config that you are trying to set?
>
> -Andrew
>
> 2015-06-12 11:17 GMT-07:00 Peng Cheng :
>
>> In Spark <1.3.x, the system property of the driver can be set by --conf
>> option, shared between setting spark properties and system properties.
>>
>> In Spark 1.4.0 this feature is removed, the driver instead log the
>> following
>> warning:
>>
>> Warning: Ignoring non-spark config property: xxx.xxx=v
>>
>> How do set driver's system property in 1.4.0? Is there a reason it is
>> removed without a deprecation warning?
>>
>> Thanks a lot for your advices.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


[Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
In Spark <1.3.x, the system property of the driver can be set by --conf
option, shared between setting spark properties and system properties.

In Spark 1.4.0 this feature is removed, the driver instead log the following
warning:

Warning: Ignoring non-spark config property: xxx.xxx=v

How do set driver's system property in 1.4.0? Is there a reason it is
removed without a deprecation warning?

Thanks a lot for your advices.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2015-05-21 Thread Peng Cheng
I stumble upon this thread and I conjecture that this may affect restoring a
checkpointed RDD as well:

http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-gt-10-hour-between-stage-latency-td22925.html#a22928

In my case I have 1600+ fragmented checkpoint file and the time to read all
metadata takes a staggering 11 hours.

If this is really the cause then its an obvious handicap, as checkponted RDD
already has all file parttition information available and doesn't need to to
read them from s3 into driver again (which cause a single-point-of-failure
and a bottleneck).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3NativeFileSystem-inefficient-implementation-when-calling-sc-textFile-tp19841p22984.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long (> 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Turns out the above thread is unrelated: it was caused by using s3:// instead
of s3n://. Which I already avoided in my checkpointDir configuration.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long (> 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Looks like this problem has been mentioned before:

http://qnalist.com/questions/5666463/downloads-from-s3-exceedingly-slow-when-running-on-spark-ec2

and a temporarily solution is to deploy on a dedicated EMR/S3 configuration.
I'll go for that one for a shot.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long (> 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
BTW: My thread dump of the driver's main thread looks like it is stuck on
waiting for Amazon S3 bucket metadata for a long time (which may suggests
that I should move checkpointing directory from S3 to HDFS):

Thread 1: main (RUNNABLE) 
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
sun.security.ssl.InputRecord.read(InputRecord.java:480)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260)
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:223)
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271)
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685)
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487)
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172)
sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22926.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Union of checkpointed RDD in Apache Spark has long (> 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
I'm implementing one of my machine learning/graph analysis algorithm on
Apache Spark:

The algorithm is very iterative (like all other ML algorithms), but it has a
rather strange workflow: first a subset of all training data (called seeds
RDD: {S_1} is randomly selected) and loaded, in each iteration, the seeds
{S_n} will update itself {S_n+1} and yield a model RDD: {M_n}. After the
seeds have reached a condition the iteration will stop and all model RDD are
aggregated to yield the final result.

Like all iterative implementation in MLLib, both {S_} and {M_} has to be
checkpointed regularly (which seems to be more efficient than commiting {M}
into a growing RDD and cache/checkpoint it: old data already on HDFS don't
have to be written into disk again or take memory space until the final
stage).

However, before the final step when all {M_*} are aggregated. The spark
seems to get frozen: all stages/jobs are completed, no new stage/job are
pending, and all drivers and clusters are running but doing nothing (the
algorithm is still far from completion).

I have to wait for 10+ hours before it start to proceed. So the latency
between stages on UI looks really weird (see the sharp contrast between 15s
task running time and 10h+ between-stage latency?):

 

I wonder if my implementation for algorithm is not optimized for Spark? Or I
simply encounter a hidden issue? Thanks a lot for your opinion



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What are the likely causes of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle?

2015-04-24 Thread Peng Cheng
I'm deploying a Spark data processing job on an EC2 cluster, the job is small
for the cluster (16 cores with 120G RAM in total), the largest RDD has only
76k+ rows. But heavily skewed in the middle (thus requires repartitioning)
and each row has around 100k of data after serialization. The job always got
stuck in repartitioning. Namely, the job will constantly get following
errors and retries:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle

org.apache.spark.shuffle.FetchFailedException: Error in opening
FileSegmentManagedBuffer

org.apache.spark.shuffle.FetchFailedException:
java.io.FileNotFoundException: /tmp/spark-...
I've tried to identify the problem but it seems like both memory and disk
consumption of the machine throwing these errors are below 50%. I've also
tried different configurations, including:

let driver/executor memory use 60% of total memory.
let netty to priortize JVM shuffling buffer.
increase shuffling streaming buffer to 128m.
use KryoSerializer and max out all buffers
increase shuffling memoryFraction to 0.4
But none of them works. The small job always trigger the same series of
errors and max out retries (upt to 1000 times). How to troubleshoot this
thing in such situation?

Thanks a lot if you have any clue.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-are-the-likely-causes-of-org-apache-spark-shuffle-MetadataFetchFailedException-Missing-an-outpu-tp22646.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Performance on Yarn

2015-04-20 Thread Peng Cheng
I got exactly the same problem, except that I'm running on a standalone
master. Can you tell me the counterpart parameter on standalone master for
increasing the same memroy overhead?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22580.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to avoid “Invalid checkpoint directory” error in apache Spark?

2015-04-17 Thread Peng Cheng
I'm using Amazon EMR + S3 as my spark cluster infrastructure. When I'm
running a job with periodic checkpointing (it has a long dependency tree, so
truncating by checkpointing is mandatory, each checkpoint has 320
partitions). The job stops halfway, resulting an exception:

(On driver)
org.apache.spark.SparkException: Invalid checkpoint directory:
s3n://spooky-checkpoint/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198
at
org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
...

(On Executor)
15/04/17 22:00:14 WARN StorageService: Encountered 4 Internal Server
error(s), will retry in 800ms
15/04/17 22:00:15 WARN RestStorageService: Retrying request following error
response: PUT '/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198/part-00025' --
ResponseCode: 500, ResponseStatus: Internal Server Error
...

After manually checking checkpointed files I found that
/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198/part-00025 is indeed missing
on S3. So my question is: if it is missing (perhaps due to AWS malfunction),
why didn't spark detect it immediately in the checkpointing process (so it
can be retried), instead of throwing an irrecoverable error stating that
dependency tree is already lost? And how to avoid this situation from
happening again?

I don't think this problem is addressed before because HDFS is assumed to be
immediately consistent (unlike S3 which is eventually consistent) and
extremely resilient. However every component has a chance of breakdown, can
you share your best practice of checkpointing?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-Invalid-checkpoint-directory-error-in-apache-Spark-tp22548.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: refer to dictionary

2015-03-31 Thread Peng Xia
Hi Ted,

Thanks very much, yea, using broadcast is much faster.

Best,
Peng

On Tue, Mar 31, 2015 at 8:49 AM, Ted Yu  wrote:

> You can use broadcast variable.
>
> See also this thread:
>
> http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variable&subj=How+Broadcast+variable+scale+
>
>
>
> > On Mar 31, 2015, at 4:43 AM, Peng Xia  wrote:
> >
> > Hi,
> >
> > I have a RDD (rdd1)where each line is split into an array ["a", "b",
> "c], etc.
> > And I also have a local dictionary p (dict1) stores key value pair
> {"a":1, "b": 2, c:3}
> > I want to replace the keys in the rdd with the its corresponding value
> in the dict:
> > rdd1.map(lambda line: [dict1[item] for item in line])
> >
> > But this task is not distributed, I believe the reason is the dict1 is a
> local instance.
> > Can any one provide suggestions on this to parallelize this?
> >
> >
> > Thanks,
> > Best,
> > Peng
> >
>


refer to dictionary

2015-03-31 Thread Peng Xia
Hi,

I have a RDD (rdd1)where each line is split into an array ["a", "b", "c],
etc.
And I also have a local dictionary p (dict1) stores key value pair {"a":1,
"b": 2, c:3}
I want to replace the keys in the rdd with the its corresponding value in
the dict:
rdd1.map(lambda line: [dict1[item] for item in line])

But this task is not distributed, I believe the reason is the dict1 is a
local instance.
Can any one provide suggestions on this to parallelize this?


Thanks,
Best,
Peng


Re: spark there is no space on the disk

2015-03-31 Thread Peng Xia
Yes, we have just modified the configuration, and every thing works fine.
Thanks very much for the help.

On Thu, Mar 19, 2015 at 5:24 PM, Ted Yu  wrote:

> For YARN, possibly this one ?
>
> 
>   yarn.nodemanager.local-dirs
>   /hadoop/yarn/local
> 
>
> Cheers
>
> On Thu, Mar 19, 2015 at 2:21 PM, Marcelo Vanzin 
> wrote:
>
>> IIRC you have to set that configuration on the Worker processes (for
>> standalone). The app can't override it (only for a client-mode
>> driver). YARN has a similar configuration, but I don't know the name
>> (shouldn't be hard to find, though).
>>
>> On Thu, Mar 19, 2015 at 11:56 AM, Davies Liu 
>> wrote:
>> > Is it possible that `spark.local.dir` is overriden by others? The docs
>> say:
>> >
>> > NOTE: In Spark 1.0 and later this will be overriden by
>> > SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN)
>> >
>> > On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia 
>> wrote:
>> >> Hi Sean,
>> >>
>> >> Thank very much for your reply.
>> >> I tried to config it from below code:
>> >>
>> >> sf = SparkConf().setAppName("test").set("spark.executor.memory",
>> >> "45g").set("spark.cores.max", 62),set("spark.local.dir", "C:\\tmp")
>> >>
>> >> But still get the error.
>> >> Do you know how I can config this?
>> >>
>> >>
>> >> Thanks,
>> >> Best,
>> >> Peng
>> >>
>> >>
>> >> On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen  wrote:
>> >>>
>> >>> It means pretty much what it says. You ran out of space on an executor
>> >>> (not driver), because the dir used for serialization temp files is
>> >>> full (not all volumes). Set spark.local.dirs to something more
>> >>> appropriate and larger.
>> >>>
>> >>> On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia 
>> wrote:
>> >>> > Hi
>> >>> >
>> >>> >
>> >>> > I was running a logistic regression algorithm on a 8 nodes spark
>> >>> > cluster,
>> >>> > each node has 8 cores and 56 GB Ram (each node is running a windows
>> >>> > system).
>> >>> > And the spark installation driver has 1.9 TB capacity. The dataset
>> I was
>> >>> > training on are has around 40 million records with around 6600
>> features.
>> >>> > But
>> >>> > I always get this error during the training process:
>> >>> >
>> >>> > Py4JJavaError: An error occurred while calling
>> >>> > o70.trainLogisticRegressionModelWithLBFGS.
>> >>> > : org.apache.spark.SparkException: Job aborted due to stage failure:
>> >>> > Task
>> >>> > 2709 in stage 3.0 failed 4 times, most recent failure: Lost task
>> 2709.3
>> >>> > in
>> >>> > stage 3.0 (TID 2766,
>> >>> > workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
>> >>> > java.io.IOException: There is not enough space on the disk
>> >>> > at java.io.FileOutputStream.writeBytes(Native Method)
>> >>> > at java.io.FileOutputStream.write(FileOutputStream.java:345)
>> >>> > at
>> >>> > java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
>> >>> > at
>> >>> >
>> >>> >
>> org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
>> >>> > at
>> >>> >
>> >>> >
>> org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
>> >>> > at
>> >>> >
>> org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
>> >>> > at
>> >>> >
>> >>> >
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>> >>> > at
>> >>> >
>> >>> >
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
>> >>> > at
>> >>> >
>> >>> >
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
>> >>> > at
&

Re: Can I start multiple executors in local mode?

2015-03-16 Thread xu Peng
Hi David,

You can try the local-cluster.

the number in local-cluster[2,2,1024] represents that there are 2 worker, 2
cores and 1024M

Best Regards

Peng Xu

2015-03-16 19:46 GMT+08:00 Xi Shen :

> Hi,
>
> In YARN mode you can specify the number of executors. I wonder if we can
> also start multiple executors at local, just to make the test run faster.
>
> Thanks,
> David
>


Re: spark there is no space on the disk

2015-03-14 Thread Peng Xia
And I have 2 TB free space on C driver.

On Sat, Mar 14, 2015 at 8:29 PM, Peng Xia  wrote:

> Hi Sean,
>
> Thank very much for your reply.
> I tried to config it from below code:
>
> sf = SparkConf().setAppName("test").set("spark.executor.memory", 
> "45g").set("spark.cores.max", 62),set("spark.local.dir", "C:\\tmp")
>
> But still get the error.
> Do you know how I can config this?
>
>
> Thanks,
> Best,
> Peng
>
>
> On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen  wrote:
>
>> It means pretty much what it says. You ran out of space on an executor
>> (not driver), because the dir used for serialization temp files is
>> full (not all volumes). Set spark.local.dirs to something more
>> appropriate and larger.
>>
>> On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia  wrote:
>> > Hi
>> >
>> >
>> > I was running a logistic regression algorithm on a 8 nodes spark
>> cluster,
>> > each node has 8 cores and 56 GB Ram (each node is running a windows
>> system).
>> > And the spark installation driver has 1.9 TB capacity. The dataset I was
>> > training on are has around 40 million records with around 6600
>> features. But
>> > I always get this error during the training process:
>> >
>> > Py4JJavaError: An error occurred while calling
>> > o70.trainLogisticRegressionModelWithLBFGS.
>> > : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task
>> > 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3
>> in
>> > stage 3.0 (TID 2766,
>> > workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
>> > java.io.IOException: There is not enough space on the disk
>> > at java.io.FileOutputStream.writeBytes(Native Method)
>> > at java.io.FileOutputStream.write(FileOutputStream.java:345)
>> > at
>> java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
>> > at
>> >
>> org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
>> > at
>> >
>> org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
>> > at
>> > org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
>> > at
>> >
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>> > at
>> >
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
>> > at
>> >
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
>> > at
>> > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>> > at
>> >
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>> > at
>> >
>> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
>> > at
>> >
>> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
>> > at
>> > org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
>> > at
>> > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
>> > at
>> >
>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
>> > at
>> > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
>> > at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
>> > at
>> org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
>> > at
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>> > at
>> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> > at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> > at
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>>

Re: spark there is no space on the disk

2015-03-14 Thread Peng Xia
Hi Sean,

Thank very much for your reply.
I tried to config it from below code:

sf = SparkConf().setAppName("test").set("spark.executor.memory",
"45g").set("spark.cores.max", 62),set("spark.local.dir", "C:\\tmp")

But still get the error.
Do you know how I can config this?


Thanks,
Best,
Peng


On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen  wrote:

> It means pretty much what it says. You ran out of space on an executor
> (not driver), because the dir used for serialization temp files is
> full (not all volumes). Set spark.local.dirs to something more
> appropriate and larger.
>
> On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia  wrote:
> > Hi
> >
> >
> > I was running a logistic regression algorithm on a 8 nodes spark cluster,
> > each node has 8 cores and 56 GB Ram (each node is running a windows
> system).
> > And the spark installation driver has 1.9 TB capacity. The dataset I was
> > training on are has around 40 million records with around 6600 features.
> But
> > I always get this error during the training process:
> >
> > Py4JJavaError: An error occurred while calling
> > o70.trainLogisticRegressionModelWithLBFGS.
> > : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> > 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3
> in
> > stage 3.0 (TID 2766,
> > workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
> > java.io.IOException: There is not enough space on the disk
> > at java.io.FileOutputStream.writeBytes(Native Method)
> > at java.io.FileOutputStream.write(FileOutputStream.java:345)
> > at
> java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> > at
> >
> org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
> > at
> >
> org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
> > at
> > org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
> > at
> >
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
> > at
> >
> java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
> > at
> >
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
> > at
> > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
> > at
> >
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
> > at
> >
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
> > at
> >
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
> > at
> > org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
> > at
> > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
> > at
> > org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
> > at
> > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
> > at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
> > at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
> > at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
> > at
> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> > at org.apache.spark.scheduler.Task.run(Task.scala:56)
> > at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > Driver stacktrace:
> > at
> > org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> > at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > 

Re: Loading in json with spark sql

2015-03-13 Thread Kevin Peng
Yin,

Yup thanks.  I fixed that shortly after I posted and it worked.

Thanks,

Kevin

On Fri, Mar 13, 2015 at 8:28 PM, Yin Huai  wrote:

> Seems you want to use array for the field of "providers", like 
> "providers":[{"id":
> ...}, {"id":...}] instead of "providers":{{"id": ...}, {"id":...}}
>
> On Fri, Mar 13, 2015 at 7:45 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I was noodling around with loading in a json file into spark sql's hive
>> context and I noticed that I get the following message after loading in
>> the
>> json file:
>> PhysicalRDD [_corrupt_record#0], MappedRDD[5] at map at JsonRDD.scala:47
>>
>> I am using the HiveContext to load in the json file using the jsonFile
>> command.  I also have 1 json object per line on the file.  Here is a
>> sample
>> of the contents in the json file:
>>
>> {"user_id":"7070","providers":{{"id":"8753","name":"pjfig","behaviors":{"b1":"erwxt","b2":"yjooj"}},{"id":"8329","name":"dfvhh","behaviors":{"b1":"pjjdn","b2":"ooqsh"
>>
>> {"user_id":"1615","providers":{{"id":"6105","name":"rsfon","behaviors":{"b1":"whlje","b2":"lpjnq"}},{"id":"6828","name":"pnmrb","behaviors":{"b1":"fjpmz","b2":"dxqxk"
>>
>> {"user_id":"5210","providers":{{"id":"9360","name":"xdylm","behaviors":{"b1":"gcdze","b2":"cndcs"}},{"id":"4812","name":"gxboh","behaviors":{"b1":"qsxao","b2":"ixdzq"
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Loading-in-json-with-spark-sql-tp22044.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


spark there is no space on the disk

2015-03-13 Thread Peng Xia
Hi


I was running a logistic regression algorithm on a 8 nodes spark cluster,
each node has 8 cores and 56 GB Ram (each node is running a windows
system). And the spark installation driver has 1.9 TB capacity. The dataset
I was training on are has around 40 million records with around 6600
features. But I always get this error during the training process:

Py4JJavaError: An error occurred while calling
o70.trainLogisticRegressionModelWithLBFGS.:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task
2709.3 in stage 3.0 (TID 2766,
workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
java.io.IOException: There is not enough space on the disk
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at 
org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
at 
org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
at 
org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
at 
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
at 
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Re: spark sql writing in avro

2015-03-13 Thread Kevin Peng
Markus,

Thanks.  That makes sense.  I was able to get this to work with spark-shell
passing in the git built jar.  I did notice that I couldn't get
AvroSaver.save to work with SQLContext, but it works with HiveContext.  Not
sure if that is an issue, but for me, it is fine.

Once again, thanks for the help.

Kevin

On Fri, Mar 13, 2015 at 1:57 PM, M. Dale  wrote:

> I probably did not do a good enough job explaining the problem. If you
> used Maven with the
> default Maven repository you have an old version of spark-avro that does
> not contain AvroSaver and does not have the saveAsAvro method implemented:
>
> Assuming you use the default Maven repo location:
> cd ~/.m2/repository/com/databricks/spark-avro_2.10/0.1
> jar tvf spark-avro_2.10-0.1.jar | grep AvroSaver
>
> Comes up empty. The jar file does not contain this class because
> AvroSaver.scala wasn't added until Jan 21. The jar file is from 14 November.
>
> So:
> git clone g...@github.com:databricks/spark-avro.git
> cd spark-avro
> sbt publish-m2
>
> This publishes the latest master code (this includes AvroSaver etc.) to
> your local Maven repo and Maven will pick up the latest version of
> spark-avro (for this machine).
>
> Now you should be able to compile and run.
>
> HTH,
> Markus
>
>
> On 03/12/2015 11:55 PM, Kevin Peng wrote:
>
> Dale,
>
>  I basically have the same maven dependency above, but my code will not
> compile due to not being able to reference to AvroSaver, though the
> saveAsAvro reference compiles fine, which is weird.  Eventhough saveAsAvro
> compiles for me, it errors out when running the spark job due to it not
> being implemented (the job quits and says non implemented method or
> something along those lines).
>
>  I will try going the spark shell and passing in the jar built from
> github since I haven't tried that quite yet.
>
> On Thu, Mar 12, 2015 at 6:44 PM, M. Dale  wrote:
>
>> Short answer: if you downloaded spark-avro from the repo.maven.apache.org
>> repo you might be using an old version (pre-November 14, 2014) -
>> see timestamps at
>> http://repo.maven.apache.org/maven2/com/databricks/spark-avro_2.10/0.1/
>> Lots of changes at https://github.com/databricks/spark-avro since then.
>>
>> Databricks, thank you for sharing the Avro code!!!
>>
>> Could you please push out the latest version or update the version
>> number and republish to repo.maven.apache.org (I have no idea how jars
>> get
>> there). Or is there a different repository that users should point to for
>> this artifact?
>>
>> Workaround: Download from https://github.com/databricks/spark-avro and
>> build
>> with latest functionality (still version 0.1) and add to your local Maven
>> or Ivy repo.
>>
>> Long version:
>> I used a default Maven build and declared my dependency on:
>>
>> 
>> com.databricks
>> spark-avro_2.10
>> 0.1
>> 
>>
>> Maven downloaded the 0.1 version from
>> http://repo.maven.apache.org/maven2/com/databricks/spark-avro_2.10/0.1/
>> and included it in my app code jar.
>>
>> From spark-shell:
>>
>> import com.databricks.spark.avro._
>> import org.apache.spark.sql.SQLContext
>> val sqlContext = new SQLContext(sc)
>>
>> # This schema includes LONG for time in millis (
>> https://github.com/medale/spark-mail/blob/master/mailrecord/src/main/avro/com/uebercomputing/mailrecord/MailRecord.avdl
>> )
>> val recordsSchema = sqlContext.avroFile("/opt/rpm1/enron/enron-tiny.avro")
>> java.lang.RuntimeException: Unsupported type LONG
>>
>> However, checking out the spark-avro code from its GitHub repo and adding
>> a test case against the MailRecord avro everything ran fine.
>>
>> So I built the databricks spark-avro locally on my box and then put it in
>> my
>> local Maven repo - everything worked from spark-shell when adding that jar
>> as dependency.
>>
>> Hope this helps for the "save" case as well. On the pre-14NOV version,
>> avro.scala
>> says:
>>  // TODO: Implement me.
>>   implicit class AvroSchemaRDD(schemaRDD: SchemaRDD) {
>> def saveAsAvroFile(path: String): Unit = ???
>>   }
>>
>> Markus
>>
>> On 03/12/2015 07:05 PM, kpeng1 wrote:
>>
>>> Hi All,
>>>
>>> I am current trying to write out a scheme RDD to avro.  I noticed that
>>> there
>>> is a databricks spark-avro library and I have included that in my
>>> dependencies, but it looks like I am not able to access the AvroSaver
>>> 

Re: spark sql writing in avro

2015-03-12 Thread Kevin Peng
Dale,

I basically have the same maven dependency above, but my code will not
compile due to not being able to reference to AvroSaver, though the
saveAsAvro reference compiles fine, which is weird.  Eventhough saveAsAvro
compiles for me, it errors out when running the spark job due to it not
being implemented (the job quits and says non implemented method or
something along those lines).

I will try going the spark shell and passing in the jar built from github
since I haven't tried that quite yet.

On Thu, Mar 12, 2015 at 6:44 PM, M. Dale  wrote:

> Short answer: if you downloaded spark-avro from the repo.maven.apache.org
> repo you might be using an old version (pre-November 14, 2014) -
> see timestamps at http://repo.maven.apache.org/
> maven2/com/databricks/spark-avro_2.10/0.1/
> Lots of changes at https://github.com/databricks/spark-avro since then.
>
> Databricks, thank you for sharing the Avro code!!!
>
> Could you please push out the latest version or update the version
> number and republish to repo.maven.apache.org (I have no idea how jars get
> there). Or is there a different repository that users should point to for
> this artifact?
>
> Workaround: Download from https://github.com/databricks/spark-avro and
> build
> with latest functionality (still version 0.1) and add to your local Maven
> or Ivy repo.
>
> Long version:
> I used a default Maven build and declared my dependency on:
>
> 
> com.databricks
> spark-avro_2.10
> 0.1
> 
>
> Maven downloaded the 0.1 version from http://repo.maven.apache.org/
> maven2/com/databricks/spark-avro_2.10/0.1/ and included it in my app code
> jar.
>
> From spark-shell:
>
> import com.databricks.spark.avro._
> import org.apache.spark.sql.SQLContext
> val sqlContext = new SQLContext(sc)
>
> # This schema includes LONG for time in millis (https://github.com/medale/
> spark-mail/blob/master/mailrecord/src/main/avro/com/
> uebercomputing/mailrecord/MailRecord.avdl)
> val recordsSchema = sqlContext.avroFile("/opt/rpm1/enron/enron-tiny.avro")
> java.lang.RuntimeException: Unsupported type LONG
>
> However, checking out the spark-avro code from its GitHub repo and adding
> a test case against the MailRecord avro everything ran fine.
>
> So I built the databricks spark-avro locally on my box and then put it in
> my
> local Maven repo - everything worked from spark-shell when adding that jar
> as dependency.
>
> Hope this helps for the "save" case as well. On the pre-14NOV version,
> avro.scala
> says:
>  // TODO: Implement me.
>   implicit class AvroSchemaRDD(schemaRDD: SchemaRDD) {
> def saveAsAvroFile(path: String): Unit = ???
>   }
>
> Markus
>
> On 03/12/2015 07:05 PM, kpeng1 wrote:
>
>> Hi All,
>>
>> I am current trying to write out a scheme RDD to avro.  I noticed that
>> there
>> is a databricks spark-avro library and I have included that in my
>> dependencies, but it looks like I am not able to access the AvroSaver
>> object.  On compilation of the job I get this:
>> error: not found: value AvroSaver
>> [ERROR] AvroSaver.save(resultRDD, args(4))
>>
>> I also tried calling saveAsAvro on the resultRDD(the actual rdd with the
>> results) and that passes compilation, but when I run the code I get an
>> error
>> that says the saveAsAvro is not implemented.  I am using version 0.1 of
>> spark-avro_2.10
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/spark-sql-writing-in-avro-tp22021.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: error on training with logistic regression sgd

2015-03-10 Thread Peng Xia
Hi,

Can anyone give an idea about this?
Just did some google search, it seems related to the 2gb limitation on
block size, https://issues.apache.org/jira/browse/SPARK-1476.
The whole process is that:
1. load the data
2. convert each line of data into labeled points using some feature hashing
algorithm in python.
3. train a logistic regression model with  the converted labeled points.
Can any one give some advice for how to avoid the 2gb, if this is the cause?
Thanks very much for the help.

Best,
Peng

On Mon, Mar 9, 2015 at 3:54 PM, Peng Xia  wrote:

> Hi,
>
> I was launching a spark cluster with 4 work nodes, each work nodes
> contains 8 cores and 56gb ram, and I was testing my logistic regression
> problem.
> The training set is around 1.2 million records.When I was using 2**10
> (1024) features, the whole program works fine, but when I use 2**14
> features, the program has encountered the error:
>
> Py4JJavaError: An error occurred while calling 
> o84.trainLogisticRegressionModelWithSGD.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 
> in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 
> (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): 
> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>   at 
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
>   at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
>   at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>   at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>   at 
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>   at j

error on training with logistic regression sgd

2015-03-09 Thread Peng Xia
 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



The data are transformed to LabeledPoint and I was using pyspark for this.
Can anyone help me on this?


Thanks,
Best,
Peng


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Marcelo,

Thanks.  The one in the CDH repo fixed it :)

On Wed, Mar 4, 2015 at 4:37 PM, Marcelo Vanzin  wrote:

> Hi Kevin,
>
> If you're using CDH, I'd recommend using the CDH repo [1], and also
> the CDH version when building your app.
>
> [1]
> http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html
>
> On Wed, Mar 4, 2015 at 4:34 PM, Kevin Peng  wrote:
> > Ted,
> >
> > I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not
> too
> > sure about the compatibility issues between 1.2.0 and 1.2.1, that is why
> I
> > would want to stick to 1.2.0.
> >
> >
> >
> > On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu  wrote:
> >>
> >> Kevin:
> >> You can try with 1.2.1
> >>
> >> See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1
> >>
> >> Cheers
> >>
> >> On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng  wrote:
> >>>
> >>> Marcelo,
> >>>
> >>> Yes that is correct, I am going through a mirror, but 1.1.0 works
> >>> properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0
> pom
> >>> file.
> >>>
> >>> On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin 
> >>> wrote:
> >>>>
> >>>> Seems like someone set up "m2.mines.com" as a mirror in your pom file
> >>>> or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
> >>>> in a messed up state).
> >>>>
> >>>> On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
> >>>> > Hi All,
> >>>> >
> >>>> > I am currently having problem with the maven dependencies for
> version
> >>>> > 1.2.0
> >>>> > of spark-core and spark-hive.  Here are my dependencies:
> >>>> > 
> >>>> >   org.apache.spark
> >>>> >   spark-core_2.10
> >>>> >   1.2.0
> >>>> > 
> >>>> > 
> >>>> >   org.apache.spark
> >>>> >   spark-hive_2.10
> >>>> >   1.2.0
> >>>> > 
> >>>> >
> >>>> > When the dependencies are set to version 1.1.0, I do not get any
> >>>> > errors.
> >>>> > Here are the errors I am getting from artifactory for version 1.2.0
> of
> >>>> > spark-core:
> >>>> > error=Could not transfer artifact
> >>>> > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
> >>>> > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
> >>>> > file\:
> >>>> >
> >>>> > https\://m2.mines.com
> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
> >>>> > Return code is\: 409 , ReasonPhrase\:Conflict.
> >>>> >
> >>>> > The error is the same for spark-hive.
> >>>> >
> >>>> >
> >>>> >
> >>>> >
> >>>> > --
> >>>> > View this message in context:
> >>>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
> >>>> > Sent from the Apache Spark User List mailing list archive at
> >>>> > Nabble.com.
> >>>> >
> >>>> >
> -
> >>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>>> > For additional commands, e-mail: user-h...@spark.apache.org
> >>>> >
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Marcelo
> >>>
> >>>
> >>
> >
>
>
>
> --
> Marcelo
>


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Ted,

I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not too
sure about the compatibility issues between 1.2.0 and 1.2.1, that is why I
would want to stick to 1.2.0.



On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu  wrote:

> Kevin:
> You can try with 1.2.1
>
> See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1
>
> Cheers
>
> On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng  wrote:
>
>> Marcelo,
>>
>> Yes that is correct, I am going through a mirror, but 1.1.0 works
>> properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0 pom
>> file.
>>
>> On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin 
>> wrote:
>>
>>> Seems like someone set up "m2.mines.com" as a mirror in your pom file
>>> or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
>>> in a messed up state).
>>>
>>> On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
>>> > Hi All,
>>> >
>>> > I am currently having problem with the maven dependencies for version
>>> 1.2.0
>>> > of spark-core and spark-hive.  Here are my dependencies:
>>> > 
>>> >   org.apache.spark
>>> >   spark-core_2.10
>>> >   1.2.0
>>> > 
>>> > 
>>> >   org.apache.spark
>>> >   spark-hive_2.10
>>> >   1.2.0
>>> > 
>>> >
>>> > When the dependencies are set to version 1.1.0, I do not get any
>>> errors.
>>> > Here are the errors I am getting from artifactory for version 1.2.0 of
>>> > spark-core:
>>> > error=Could not transfer artifact
>>> > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
>>> > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
>>> file\:
>>> > https\://m2.mines.com
>>> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
>>> > Return code is\: 409 , ReasonPhrase\:Conflict.
>>> >
>>> > The error is the same for spark-hive.
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Ted,

I have tried wiping out ~/.m2/org.../spark directory multiple times.  It
doesn't seem to work.



On Wed, Mar 4, 2015 at 4:20 PM, Ted Yu  wrote:

> kpeng1:
> Try wiping out ~/.m2 and build again.
>
> Cheers
>
> On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin 
> wrote:
>
>> Seems like someone set up "m2.mines.com" as a mirror in your pom file
>> or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
>> in a messed up state).
>>
>> On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
>> > Hi All,
>> >
>> > I am currently having problem with the maven dependencies for version
>> 1.2.0
>> > of spark-core and spark-hive.  Here are my dependencies:
>> > 
>> >   org.apache.spark
>> >   spark-core_2.10
>> >   1.2.0
>> > 
>> > 
>> >   org.apache.spark
>> >   spark-hive_2.10
>> >   1.2.0
>> > 
>> >
>> > When the dependencies are set to version 1.1.0, I do not get any errors.
>> > Here are the errors I am getting from artifactory for version 1.2.0 of
>> > spark-core:
>> > error=Could not transfer artifact
>> > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
>> > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
>> file\:
>> > https\://m2.mines.com
>> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
>> > Return code is\: 409 , ReasonPhrase\:Conflict.
>> >
>> > The error is the same for spark-hive.
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Marcelo,

Yes that is correct, I am going through a mirror, but 1.1.0 works properly,
while 1.2.0 does not.  I suspect there is crc in the 1.2.0 pom file.

On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin  wrote:

> Seems like someone set up "m2.mines.com" as a mirror in your pom file
> or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
> in a messed up state).
>
> On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
> > Hi All,
> >
> > I am currently having problem with the maven dependencies for version
> 1.2.0
> > of spark-core and spark-hive.  Here are my dependencies:
> > 
> >   org.apache.spark
> >   spark-core_2.10
> >   1.2.0
> > 
> > 
> >   org.apache.spark
> >   spark-hive_2.10
> >   1.2.0
> > 
> >
> > When the dependencies are set to version 1.1.0, I do not get any errors.
> > Here are the errors I am getting from artifactory for version 1.2.0 of
> > spark-core:
> > error=Could not transfer artifact
> > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
> > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
> file\:
> > https\://m2.mines.com
> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
> > Return code is\: 409 , ReasonPhrase\:Conflict.
> >
> > The error is the same for spark-hive.
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>
>
> --
> Marcelo
>


Re: Shuffle write increases in spark 1.2

2015-02-14 Thread Peng Cheng
I double check the 1.2 feature list and found out that the new sort-based
shuffle manager has nothing to do with HashPartitioner :-< Sorry for the
misinformation.

In another hand. This may explain increase in shuffle spill as a side effect
of the new shuffle manager, let me revert spark.shuffle.manager to hash and
see if it make things better (or worse, as the benchmark in
https://issues.apache.org/jira/browse/SPARK-3280 indicates)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle write increases in spark 1.2

2015-02-14 Thread Peng Cheng
Same problem here, shuffle write increased from 10G to over 64G, since I'm
running on amazon EC2 this always cause temporary folder to consume all the
disk space. Still looking for a solution.

BTW, the 64G shuffle write is encountered on shuffling a pairRDD with
HashPartitioner, so its not related to Spark 1.2.0's new features

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21656.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why does spark write huge file into temporary local disk even without on-disk persist or checkpoint?

2015-02-11 Thread Peng Cheng
You are right. I've checked the overall stage metrics and looks like the
largest shuffling write is over 9G. The partition completed successfully
but its spilled file can't be removed until all others are finished.
It's very likely caused by a stupid mistake in my design. A lookup table
grows constantly in a loop, every time its union with a new increment will
results in both of them being reshuffled, and partitioner reverted to None.
This can never be efficient with existing API.


Why does spark write huge file into temporary local disk even without on-disk persist or checkpoint?

2015-02-10 Thread Peng Cheng
I'm running a small job on a cluster with 15G of mem and 8G of disk per
machine.

The job always get into a deadlock where the last error message is:

java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:86)
at 
org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:86)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at 
org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
at 
org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
at 
org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

By the time it happens the shuffle write size is 0.0B and input size
is 3.4MB. I wonder what operation could quickly eat up the entire 5G
free disk space.

In addition, The storage level of the entire job is confined to
MEMORY_ONLY_SERIALIZED and checkpointing is completely disabled.


Is LogisticRegressionWithSGD in MLlib scalable?

2015-02-03 Thread Peng Zhang
Hi Everyone,

Is LogisticRegressionWithSGD in MLlib scalable? 

If so, what is the idea behind the scalable implementation?

Thanks in advance,

Peng





-
Peng Zhang
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-LogisticRegressionWithSGD-in-MLlib-scalable-tp21482.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.IllegalStateException: unread block data

2015-02-02 Thread Peng Cheng
I got the same problem, maybe java serializer is unstable



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p21463.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-16 Thread Peng Cheng
I'm talking about RDD1 (not persisted or checkpointed) in this situation:

...(somewhere) -> RDD1 -> RDD2
  ||
 V   V
 RDD3 -> RDD4 -> Action!

To my experience the change RDD1 get recalculated is volatile, sometimes
once, sometimes twice. When calculation of this RDD is expensive (e.g.
involves using an RESTful service that charges me money), this compels me
to persist RDD1 which takes extra memory, and in case the Action! doesn't
always happen, I don't know when to unpersist it to  free those memory.

A related problem might be in $SQLContest.jsonRDD(), since the source
jsonRDD is used twice (one for schema inferring, another for data read). It
almost guarantees that the source jsonRDD is calculated twice. Is there a
way to solve (or circumvent) this problem?


If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-16 Thread Peng Cheng
I'm talking about RDD1 (not persisted or checkpointed) in this situation:

...(somewhere) -> RDD1 -> RDD2
  ||
 V   V
 RDD3 -> RDD4 -> Action!

To my experience the change RDD1 get recalculated is volatile, sometimes
once, sometimes twice. When calculation of this RDD is expensive (e.g.
involves using an RESTful service that charges me money), this compels me to
persist RDD1 which takes extra memory, and in case the Action! doesn't
always happen, I don't know when to unpersist it to  free those memory.

A related problem might be in $SQLContest.jsonRDD(), since the source
jsonRDD is used twice (one for schema inferring, another for data read). It
almost guarantees that the source jsonRDD is calculated twice. Has this
problem be addressed so far?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/If-an-RDD-appeared-twice-in-a-DAG-of-which-calculation-is-triggered-by-a-single-action-will-this-RDD-tp21192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DeepLearning and Spark ?

2015-01-09 Thread Peng Cheng
Not if broadcast can only be used between stages. To enable this you have
to at least make broadcast asynchronous & non-blocking.

On 9 January 2015 at 18:02, Krishna Sankar  wrote:

> I am also looking at this domain. We could potentially use the broadcast
> capability in Spark to distribute the parameters. Haven't thought thru yet.
> Cheers
> 
>
> On Fri, Jan 9, 2015 at 2:56 PM, Andrei  wrote:
>
>> Does it makes sense to use Spark's actor system (e.g. via
>> SparkContext.env.actorSystem) to create parameter server?
>>
>> On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng  wrote:
>>
>>> You are not the first :) probably not the fifth to have the question.
>>> parameter server is not included in spark framework and I've seen all
>>> kinds of hacking to improvise it: REST api, HDFS, tachyon, etc.
>>> Not sure if an 'official' benchmark & implementation will be released
>>> soon
>>>
>>> On 9 January 2015 at 10:59, Marco Shaw  wrote:
>>>
>>>> Pretty vague on details:
>>>>
>>>>
>>>> http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199
>>>>
>>>>
>>>> On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa 
>>>> wrote:
>>>>
>>>> Hi all,
>>>>
>>>> DeepLearning algorithms are popular and achieve many state of the art
>>>> performance in several real world machine learning problems. Currently
>>>> there are no DL implementation in spark and I wonder if there is an ongoing
>>>> work on this topics.
>>>>
>>>> We can do DL in spark Sparkling water and H2O but this adds an
>>>> additional software stack.
>>>>
>>>> Deeplearning4j seems to implements a distributed version of many
>>>> popural DL algorithm. Porting DL4j in Spark can be interesting.
>>>>
>>>> Google describes an implementation of a large scale DL in this paper
>>>> http://research.google.com/archive/large_deep_networks_nips2012.html.
>>>> Based on model parallelism and data parallelism.
>>>>
>>>> So, I'm trying to imaging what should be a good design for DL algorithm
>>>> in Spark ? Spark already have RDD (for data parallelism). Can GraphX be
>>>> used for the model parallelism (as DNN are generally designed as DAG) ? And
>>>> what about using GPUs to do local parallelism (mecanism to push partition
>>>> into GPU memory ) ?
>>>>
>>>>
>>>> What do you think about this ?
>>>>
>>>>
>>>> Cheers,
>>>>
>>>> Jao
>>>>
>>>>
>>>
>>
>


Re: DeepLearning and Spark ?

2015-01-09 Thread Peng Cheng
You are not the first :) probably not the fifth to have the question.
parameter server is not included in spark framework and I've seen all kinds
of hacking to improvise it: REST api, HDFS, tachyon, etc.
Not sure if an 'official' benchmark & implementation will be released soon

On 9 January 2015 at 10:59, Marco Shaw  wrote:

> Pretty vague on details:
>
> http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199
>
>
> On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa  wrote:
>
> Hi all,
>
> DeepLearning algorithms are popular and achieve many state of the art
> performance in several real world machine learning problems. Currently
> there are no DL implementation in spark and I wonder if there is an ongoing
> work on this topics.
>
> We can do DL in spark Sparkling water and H2O but this adds an additional
> software stack.
>
> Deeplearning4j seems to implements a distributed version of many popural
> DL algorithm. Porting DL4j in Spark can be interesting.
>
> Google describes an implementation of a large scale DL in this paper
> http://research.google.com/archive/large_deep_networks_nips2012.html.
> Based on model parallelism and data parallelism.
>
> So, I'm trying to imaging what should be a good design for DL algorithm in
> Spark ? Spark already have RDD (for data parallelism). Can GraphX be used
> for the model parallelism (as DNN are generally designed as DAG) ? And what
> about using GPUs to do local parallelism (mecanism to push partition into
> GPU memory ) ?
>
>
> What do you think about this ?
>
>
> Cheers,
>
> Jao
>
>


Re: Is it possible to do incremental training using ALSModel (MLlib)?

2015-01-02 Thread Peng Cheng
I was under the impression that ALS wasn't designed for it :-< The famous
ebay online recommender uses SGD
However, you can try using the previous model as starting point, and
gradually reduce the number of iteration after the model stablize. I never
verify this idea, so you need to at least cross-validate it before putting
into productio

On 2 January 2015 at 04:40, Wouter Samaey 
wrote:

> Hi all,
>
> I'm curious about MLlib and if it is possible to do incremental training on
> the ALSModel.
>
> Usually training is run first, and then you can query. But in my case, data
> is collected in real-time and I want the predictions of my ALSModel to
> consider the latest data without complete re-training phase.
>
> I've checked out these resources, but could not find any info on how to
> solve this:
> https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html
>
> http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
>
> My question fits in a larger picture where I'm using Prediction IO, and
> this
> in turn is based on Spark.
>
> Thanks in advance for any advice!
>
> Wouter
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Announcing Spark Packages

2014-12-22 Thread peng

Me 2 :)

On 12/22/2014 06:14 PM, Andrew Ash wrote:

Hi Xiangrui,

That link is currently returning a 503 Over Quota error message.  
Would you mind pinging back out when the page is back up?


Thanks!
Andrew

On Mon, Dec 22, 2014 at 12:37 PM, Xiangrui Meng > wrote:


Dear Spark users and developers,

I’m happy to announce Spark Packages (http://spark-packages.org), a
community package index to track the growing number of open source
packages and libraries that work with Apache Spark. Spark Packages
makes it easy for users to find, discuss, rate, and install packages
for any version of Spark, and makes it easy for developers to
contribute packages.

Spark Packages will feature integrations with various data sources,
management tools, higher level domain-specific libraries, machine
learning algorithms, code samples, and other Spark content. Thanks to
the package authors, the initial listing of packages includes
scientific computing libraries, a job execution server, a connector
for importing Avro data, tools for launching Spark on Google Compute
Engine, and many others.

I’d like to invite you to contribute and use Spark Packages and
provide feedback! As a disclaimer: Spark Packages is a community index
maintained by Databricks and (by design) will include packages outside
of the ASF Spark project. We are excited to help showcase and support
all of the great work going on in the broader Spark community!

Cheers,
Xiangrui

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







Re: spark-repl_1.2.0 was not uploaded to central maven repository.

2014-12-22 Thread peng

Thanks a lot for point it out. I also found it in pom.xml.
A new ticket for reverting it has been submitted: 
https://issues.apache.org/jira/browse/SPARK-4923
At first I assume that further development on it has been moved to 
databricks cloud. But the JIRA ticket was already there in September. So 
maybe demand on this API from the community is indeed low enough.
However, I would still suggest keeping it, even promoting it into a 
Developer's API, this would encourage more projects to integrate in a 
more flexible way, and save prototyping/QA cost by customizing fixtures 
of REPL. People will still move to databricks cloud, which has far more 
features than that. Many influential projects already depends on the 
routinely published Scala-REPL (e.g. playFW), it would be strange for 
Spark not doing the same.

What do you think?

Yours Peng

On 12/22/2014 04:57 PM, Sean Owen wrote:

Just closing the loop -- FWIW this was indeed on purpose --
https://issues.apache.org/jira/browse/SPARK-3452 . I take it that it's
not encouraged to depend on the REPL as a module.

On Sun, Dec 21, 2014 at 10:34 AM, Sean Owen  wrote:

I'm only speculating, but I wonder if it was on purpose? would people
ever build an app against the REPL?

On Sun, Dec 21, 2014 at 5:50 AM, Peng Cheng  wrote:

Everything else is there except spark-repl. Can someone check that out this
weekend?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-repl-1-2-0-was-not-uploaded-to-central-maven-repository-tp20799.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on Tachyon

2014-12-20 Thread Peng Cheng
IMHO: cache doesn't provide redundancy, and its in the same jvm, so its much
faster.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Tachyon-tp1463p20800.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark-repl_1.2.0 was not uploaded to central maven repository.

2014-12-20 Thread Peng Cheng
Everything else is there except spark-repl. Can someone check that out this
weekend?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-repl-1-2-0-was-not-uploaded-to-central-maven-repository-tp20799.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




How to extend an one-to-one RDD of Spark that can be persisted?

2014-12-04 Thread Peng Cheng
In my project I extend a new RDD type that wraps another RDD and some
metadata. The code I use is similar to FilteredRDD implementation:

case class PageRowRDD(
   self: RDD[PageRow],
   @transient keys: ListSet[KeyLike] = ListSet()
){
  override def getPartitions: Array[Partition] =
firstParent[PageRow].partitions

  override val partitioner = self.partitioner

  override def compute(split: Partition, context: TaskContext) =
firstParent[PageRow].iterator(split, context)
}
However when I try to persist and reuse it in 2 transformations. My logs and
debug shows that it is being computed twice, rather than being reused in
memory.

The problem is: there is no such problem for FilteredRDD. How do I avoid
this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-extend-an-one-to-one-RDD-of-Spark-that-can-be-persisted-tp20394.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to make sure a ClassPath is always shipped to workers?

2014-11-04 Thread Peng Cheng
Thanks a lot! Unfortunately this is not my problem: The page class is already
in the jar that is shipped to every worker. (I've logged into workers and
unpacked the jar files, and see the class file right there as intended)
Also, this error only happens sporadically, not every time. the error was
sometimes automatically retried on a different worker and get overcome (but
it won't be overcome if retried manually in the same partition), which make
it hard to catch.

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18018p18107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to make sure a ClassPath is always shipped to workers?

2014-11-03 Thread Peng Cheng
Sorry its a timeout duplicate, please remove it



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18018p18020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to make sure a ClassPath is always shipped to workers?

2014-11-03 Thread Peng Cheng
I have a spark application that deserialize an object 'Seq[Page]', save to
HDFS/S3, and read by another worker to be used elsewhere. The serialization
and deserialization use the same serializer as Spark itself. (Read from
SparkEnv.get.serializer.newInstance())

However I sporadically get this error:

java.lang.ClassNotFoundException: org.***.***.Page
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

It seems like Page class wasn't shipped with the Jar and executor and all
its information was erased in runtime.

The most weird thing: this error doesn't always happen, sometimes the old
Seq[Page] was get properly, sometimes it throws the exception, how could
this happen and how do I fix it?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to make sure a ClassPath is always shipped to workers?

2014-11-03 Thread Peng Cheng
I have a spark application that deserialize an object 'Seq[Page]', save to
HDFS/S3, and read by another worker to be used elsewhere. The serialization
and deserialization use the same serializer as Spark itself. (Read from
SparkEnv.get.serializer.newInstance())

However I sporadically get this error:

java.lang.ClassNotFoundException: org.***.***.Page
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

It seems like Page class wasn't shipped with the Jar and executor and all
its information was erased in runtime.

The most weird thing: this error doesn't always happen, sometimes the old
Seq[Page] was get properly, sometimes it throws the exception, how could
this happen and how do I fix it?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18018.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Thanks Jimmy.
I will have a try.
Thanks very much for your guys' help.

Best,
Peng

On Thu, Oct 30, 2014 at 8:19 PM, Jimmy  wrote:

> sampleRDD. cache()
>
> Sent from my iPhone
>
> On Oct 30, 2014, at 5:01 PM, peng xia  wrote:
>
> Hi Xiangrui,
>
> Can you give me some code example about caching, as I am new to Spark.
>
> Thanks,
> Best,
> Peng
>
> On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng  wrote:
>
>> Then caching should solve the problem. Otherwise, it is just loading
>> and parsing data from disk for each iteration. -Xiangrui
>>
>> On Thu, Oct 30, 2014 at 11:44 AM, peng xia  wrote:
>> > Thanks for all your help.
>> > I think I didn't cache the data. My previous cluster was expired and I
>> don't
>> > have a chance to check the load balance or app manager.
>> > Below is my code.
>> > There are 18 features for each record and I am using the Scala API.
>> >
>> > import org.apache.spark.SparkConf
>> > import org.apache.spark.SparkContext
>> > import org.apache.spark.SparkContext._
>> > import org.apache.spark.rdd._
>> > import org.apache.spark.mllib.classification.SVMWithSGD
>> > import org.apache.spark.mllib.regression.LabeledPoint
>> > import org.apache.spark.mllib.linalg.Vectors
>> > import java.util.Calendar
>> >
>> > object BenchmarkClassification {
>> > def main(args: Array[String]) {
>> > // Load and parse the data file
>> > val conf = new SparkConf()
>> >  .setAppName("SVM")
>> >  .set("spark.executor.memory", "8g")
>> >  // .set("spark.executor.extraJavaOptions", "-Xms8g -Xmx8g")
>> >val sc = new SparkContext(conf)
>> > val data = sc.textFile(args(0))
>> > val parsedData = data.map { line =>
>> >  val parts = line.split(',')
>> >  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =>
>> > x.toDouble)))
>> > }
>> > val testData = sc.textFile(args(1))
>> > val testParsedData = testData .map { line =>
>> >  val parts = line.split(',')
>> >  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =>
>> > x.toDouble)))
>> > }
>> >
>> > // Run training algorithm to build the model
>> > val numIterations = 20
>> > val model = SVMWithSGD.train(parsedData, numIterations)
>> >
>> > // Evaluate model on training examples and compute training error
>> > // val labelAndPreds = testParsedData.map { point =>
>> > //   val prediction = model.predict(point.features)
>> > //   (point.label, prediction)
>> > // }
>> > // val trainErr = labelAndPreds.filter(r => r._1 !=
>> r._2).count.toDouble /
>> > testParsedData.count
>> > // println("Training Error = " + trainErr)
>> > println(Calendar.getInstance().getTime())
>> > }
>> > }
>> >
>> >
>> >
>> >
>> > Thanks,
>> > Best,
>> > Peng
>> >
>> > On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng 
>> wrote:
>> >>
>> >> DId you cache the data and check the load balancing? How many
>> >> features? Which API are you using, Scala, Java, or Python? -Xiangrui
>> >>
>> >> On Thu, Oct 30, 2014 at 9:13 AM, Jimmy  wrote:
>> >> > Watch the app manager it should tell you what's running and taking
>> >> > awhile...
>> >> > My guess it's a "distinct" function on the data.
>> >> > J
>> >> >
>> >> > Sent from my iPhone
>> >> >
>> >> > On Oct 30, 2014, at 8:22 AM, peng xia  wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> >
>> >> >
>> >> > Previous we have applied SVM algorithm in MLlib to 5 million records
>> >> > (600
>> >> > mb), it takes more than 25 minutes to finish.
>> >> > The spark version we are using is 1.0 and we were running this
>> program
>> >> > on a
>> >> > 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
>> >> >
>> >> > The 5 million records only have two distinct records (One positive
>> and
>> >> > one
>> >> > negative), others are all duplications.
>> >> >
>> >> > Any one has any idea on why it takes so long on this small data?
>> >> >
>> >> >
>> >> >
>> >> > Thanks,
>> >> > Best,
>> >> >
>> >> > Peng
>> >
>> >
>>
>
>


Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Hi Xiangrui,

Can you give me some code example about caching, as I am new to Spark.

Thanks,
Best,
Peng

On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng  wrote:

> Then caching should solve the problem. Otherwise, it is just loading
> and parsing data from disk for each iteration. -Xiangrui
>
> On Thu, Oct 30, 2014 at 11:44 AM, peng xia  wrote:
> > Thanks for all your help.
> > I think I didn't cache the data. My previous cluster was expired and I
> don't
> > have a chance to check the load balance or app manager.
> > Below is my code.
> > There are 18 features for each record and I am using the Scala API.
> >
> > import org.apache.spark.SparkConf
> > import org.apache.spark.SparkContext
> > import org.apache.spark.SparkContext._
> > import org.apache.spark.rdd._
> > import org.apache.spark.mllib.classification.SVMWithSGD
> > import org.apache.spark.mllib.regression.LabeledPoint
> > import org.apache.spark.mllib.linalg.Vectors
> > import java.util.Calendar
> >
> > object BenchmarkClassification {
> > def main(args: Array[String]) {
> > // Load and parse the data file
> > val conf = new SparkConf()
> >  .setAppName("SVM")
> >  .set("spark.executor.memory", "8g")
> >  // .set("spark.executor.extraJavaOptions", "-Xms8g -Xmx8g")
> >val sc = new SparkContext(conf)
> > val data = sc.textFile(args(0))
> > val parsedData = data.map { line =>
> >  val parts = line.split(',')
> >  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =>
> > x.toDouble)))
> > }
> > val testData = sc.textFile(args(1))
> > val testParsedData = testData .map { line =>
> >  val parts = line.split(',')
> >  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =>
> > x.toDouble)))
> > }
> >
> > // Run training algorithm to build the model
> > val numIterations = 20
> > val model = SVMWithSGD.train(parsedData, numIterations)
> >
> > // Evaluate model on training examples and compute training error
> > // val labelAndPreds = testParsedData.map { point =>
> > //   val prediction = model.predict(point.features)
> > //   (point.label, prediction)
> > // }
> > // val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble
> /
> > testParsedData.count
> > // println("Training Error = " + trainErr)
> > println(Calendar.getInstance().getTime())
> > }
> > }
> >
> >
> >
> >
> > Thanks,
> > Best,
> > Peng
> >
> > On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng  wrote:
> >>
> >> DId you cache the data and check the load balancing? How many
> >> features? Which API are you using, Scala, Java, or Python? -Xiangrui
> >>
> >> On Thu, Oct 30, 2014 at 9:13 AM, Jimmy  wrote:
> >> > Watch the app manager it should tell you what's running and taking
> >> > awhile...
> >> > My guess it's a "distinct" function on the data.
> >> > J
> >> >
> >> > Sent from my iPhone
> >> >
> >> > On Oct 30, 2014, at 8:22 AM, peng xia  wrote:
> >> >
> >> > Hi,
> >> >
> >> >
> >> >
> >> > Previous we have applied SVM algorithm in MLlib to 5 million records
> >> > (600
> >> > mb), it takes more than 25 minutes to finish.
> >> > The spark version we are using is 1.0 and we were running this program
> >> > on a
> >> > 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
> >> >
> >> > The 5 million records only have two distinct records (One positive and
> >> > one
> >> > negative), others are all duplications.
> >> >
> >> > Any one has any idea on why it takes so long on this small data?
> >> >
> >> >
> >> >
> >> > Thanks,
> >> > Best,
> >> >
> >> > Peng
> >
> >
>


Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Thanks for all your help.
I think I didn't cache the data. My previous cluster was expired and I
don't have a chance to check the load balance or app manager.
Below is my code.
There are 18 features for each record and I am using the Scala API.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import java.util.Calendar

object BenchmarkClassification {
def main(args: Array[String]) {
// Load and parse the data file
val conf = new SparkConf()
  .setAppName("SVM")
  .set("spark.executor.memory", "8g")
  // .set("spark.executor.extraJavaOptions", "-Xms8g -Xmx8g")
val sc = new SparkContext(conf)
val data = sc.textFile(args(0))
val parsedData = data.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =>
x.toDouble)))
}
val testData = sc.textFile(args(1))
val testParsedData = testData .map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =>
x.toDouble)))
}

// Run training algorithm to build the model
val numIterations = 20
val model = SVMWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
// val labelAndPreds = testParsedData.map { point =>
//   val prediction = model.predict(point.features)
//   (point.label, prediction)
// }
// val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
testParsedData.count
// println("Training Error = " + trainErr)
println(Calendar.getInstance().getTime())
}
}




Thanks,
Best,
Peng

On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng  wrote:

> DId you cache the data and check the load balancing? How many
> features? Which API are you using, Scala, Java, or Python? -Xiangrui
>
> On Thu, Oct 30, 2014 at 9:13 AM, Jimmy  wrote:
> > Watch the app manager it should tell you what's running and taking
> awhile...
> > My guess it's a "distinct" function on the data.
> > J
> >
> > Sent from my iPhone
> >
> > On Oct 30, 2014, at 8:22 AM, peng xia  wrote:
> >
> > Hi,
> >
> >
> >
> > Previous we have applied SVM algorithm in MLlib to 5 million records (600
> > mb), it takes more than 25 minutes to finish.
> > The spark version we are using is 1.0 and we were running this program
> on a
> > 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
> >
> > The 5 million records only have two distinct records (One positive and
> one
> > negative), others are all duplications.
> >
> > Any one has any idea on why it takes so long on this small data?
> >
> >
> >
> > Thanks,
> > Best,
> >
> > Peng
>


issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Hi,



Previous we have applied SVM algorithm in MLlib to 5 million records (600
mb), it takes more than 25 minutes to finish.
The spark version we are using is 1.0 and we were running this program on a
4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.

The 5 million records only have two distinct records (One positive and one
negative), others are all duplications.

Any one has any idea on why it takes so long on this small data?



Thanks,
Best,

Peng


Re: Asynchronous Broadcast from driver to workers, is it possible?

2014-10-21 Thread Peng Cheng
Looks like the only way is to implement that feature. There is no way of
hacking it into working



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-tp15758p16985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Asynchronous Broadcast from driver to workers, is it possible?

2014-10-06 Thread Peng Cheng
Any suggestions? I'm thinking of submitting a feature request for mutable
broadcast. Is it doable?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-tp15758p15807.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Asynchronous Broadcast from driver to workers, is it possible?

2014-10-04 Thread Peng Cheng
While Spark already offers support for asynchronous reduce (collect data from
workers, while not interrupting execution of a parallel transformation)
through accumulator, I have made little progress on making this process
reciprocal, namely, to broadcast data from driver to workers to be used by
all executors in the middle of a transformation. This primarily intended to
be used in downpour SGD/adagrad, a non-blocking concurrent machine learning
optimizer that performs better than existing synchronous GD in MLlib, and
have vast application in training of many models.

My attempt so far is to stick to out-of-the-box, immutable broadcast, open a
new thread on driver, in which I broadcast a thin data wrapper that when
deserialized, will insert into a mutable singleton that is already
replicated to all workers in the fat jar, this customized deserialization is
not hard, just overwrite readObject like this:

class AutoInsert(var value: Int) extends Serializable{

  WorkerReplica.last = value

  private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject()
WorkerContainer.last = this.value
  }
}

Unfortunately it looks like the deserializtion is called lazily and won't do
anything before a worker use it (Broadcast[AutoInsert]), this is impossible
without waiting for workers' stage to be finished and broadcast again. I'm
wondering if I can 'hack' this thing into working? Or I'll have to write a
serious extension to broadcast component to enable changing the value.

Hope I can find like-minded on this forum because ML is a selling point of
Spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-tp15758.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[no subject]

2014-09-30 Thread PENG ZANG
Hi,

We have a cluster setup with spark 1.0.2 running 4 workers and 1 master
with 64G RAM for each. In the sparkContext we specify 32G executor memory.
However, as long as the task running longer than approximate 15 mins, all
the executors are lost just like some sort of timeout no matter if the task
is using up the memory. We tried to increase the spark.akka.timeout,
spark.akka.lookupTimeout, and spark.worker.timeout, but still no luck.
Besides, even we just start a sparkContext and sit there instead of "stop"
it, it will still error out with the exception below:

[error] o.a.s.s.TaskSchedulerImpl - Lost executor 0 on XXX06: remote Akka
client disassociated
[error] o.a.s.n.ConnectionManager - Corresponding SendingConnection to
ConnectionManagerId(XXX06.local,34307) not found
[error] o.a.s.s.TaskSchedulerImpl - Lost executor 2 on XXX08: remote Akka
client disassociated
[error] o.a.s.s.TaskSchedulerImpl - Lost executor 1 on XXX07: remote Akka
client disassociated
[error] o.a.s.n.SendingConnection - Exception while reading
SendingConnection to ConnectionManagerId(XXX1,56639)
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
~[na:1.7.0_60]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
~[na:1.7.0_60]
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
~[spark-core_2.10-1.1.0.jar:1.1.0]
at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
[spark-core_2.10-1.1.0.jar:1.1.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_60]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_60]
[error] o.a.s.n.SendingConnection - Exception while reading
SendingConnection to ConnectionManagerId(XXX08.local,39914)
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
~[na:1.7.0_60]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
~[na:1.7.0_60]
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
~[spark-core_2.10-1.1.0.jar:1.1.0]
at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
[spark-core_2.10-1.1.0.jar:1.1.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_60]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_60]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] ->
[akka.tcp://sparkExecutor@XXX06:60653]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX06:60653]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX06:60653]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX06/10.40.31.51:60653
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] ->
[akka.tcp://sparkExecutor@XXX06:61000]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX06:61000]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX06:61000]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX06/10.40.31.51:61000
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] ->
[akka.tcp://sparkExecutor@XXX08:52949]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX08:52949]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX08:52949]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX08/10.40.31.53:52949
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] ->
[akka.tcp://sparkExecutor@XXX08:36726]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX08:36726]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX08:36726]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX08/10.40.31.53:36726
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] ->
[akka.tcp://sparkExecutor@XXX07:46516]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX07:46516]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX07:46516]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX07/10.40.31.52:46516
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] ->
[akka.tcp://sparkExecutor@XXX07:48160]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX07:48160]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sp

Re: Invalid signature file digest for Manifest main attributes with spark job built using maven

2014-09-15 Thread Kevin Peng
Sean,

Thanks.  That worked.

Kevin

On Mon, Sep 15, 2014 at 3:37 PM, Sean Owen  wrote:

> This is more of a Java / Maven issue than Spark per se. I would use
> the shade plugin to remove signature files in your final META-INF/
> dir. As Spark does, in its :
>
> 
>   
> *:*
> 
>   org/datanucleus/**
>   META-INF/*.SF
>   META-INF/*.DSA
>   META-INF/*.RSA
> 
>   
> 
>
> On Mon, Sep 15, 2014 at 11:33 PM, kpeng1  wrote:
> > Hi All,
> >
> > I am trying to submit a spark job that I have built in maven using the
> > following command:
> > /usr/bin/spark-submit --deploy-mode client --class com.spark.TheMain
> > --master local[1] /home/cloudera/myjar.jar 100
> >
> > But I seem to be getting the following error:
> > Exception in thread "main" java.lang.SecurityException: Invalid signature
> > file digest for Manifest main attributes
> > at
> >
> sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:286)
> > at
> >
> sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:239)
> > at java.util.jar.JarVerifier.processEntry(JarVerifier.java:307)
> > at java.util.jar.JarVerifier.update(JarVerifier.java:218)
> > at java.util.jar.JarFile.initializeVerifier(JarFile.java:345)
> > at java.util.jar.JarFile.getInputStream(JarFile.java:412)
> > at
> sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:775)
> > at sun.misc.Resource.cachedInputStream(Resource.java:77)
> > at sun.misc.Resource.getByteBuffer(Resource.java:160)
> > at java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
> > at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:270)
> > at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:289)
> > at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
> > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >
> >
> > Here is the pom file I am using to build the jar:
> > http://maven.apache.org/POM/4.0.0";
> > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
> > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> > http://maven.apache.org/maven-v4_0_0.xsd";>
> >   4.0.0
> >   com.spark
> >   myjar
> >   0.0.1-SNAPSHOT
> >   ${project.artifactId}
> >   My wonderfull scala app
> >   2010
> >   
> > 
> >   My License
> >   http://
> >   repo
> > 
> >   
> >
> >   
> > cdh5.1.0
> > 1.6
> > 1.6
> > UTF-8
> > 2.10
> > 2.10.4
> >   
> >
> >   
> > 
> >   scala-tools.org
> >   Scala-tools Maven2 Repository
> >   https://oss.sonatype.org/content/repositories/snapshots/
> 
> > 
> > 
> >   maven-hadoop
> >   Hadoop Releases
> >
> > https://repository.cloudera.com/content/repositories/releases/
> 
> > 
> > 
> >   cloudera-repos
> >   Cloudera Repos
> >   https://repository.cloudera.com/artifactory/cloudera-repos/
> 
> > 
> >   
> >   
> > 
> >   scala-tools.org
> >   Scala-tools Maven2 Repository
> >   https://oss.sonatype.org/content/repositories/snapshots/
> 
> > 
> >   
> >
> >   
> > 
> >   org.scala-lang
> >   scala-library
> >   ${scala.version}
> > 
> > 
> >   org.apache.spark
> >   spark-core_2.10
> >   1.0.0-${cdh.version}
> > 
> > 
> >   org.apache.spark
> >   spark-tools_2.10
> >   1.0.0-${cdh.version}
> > 
> > 
> >   org.apache.spark
> >   spark-streaming-flume_2.10
> >   1.0.0-${cdh.version}
> > 
> > 
> >   org.apache.spark
> >   spark-streaming_2.10
> >   1.0.0-${cdh.version}
> > 
> > 
> >   org.apache.flume
> >   flume-ng-sdk
> >   1.5.0-${cdh.version}
> >
> >   
> > 
> >   io.netty
> >   netty
> > 
> >   
> > 
> > 
> >   org.apache.flume
> >   flume-ng-core
> >   1.5.0-${cdh.version}
> >
> >   
> > 
> >   io.netty
> >   netty
> > 
> >   
> > 
> > 
> >   org.apache.hbase
> >   hbase-client
> >   0.98.1-${cdh.version}
> >
> >   
> > 
> >   io.netty
> >   netty
> > 
> >   
> > 
> > 
> >   org.apache.hadoop
> >   hadoop-client
> >   2.3.0-${cdh.version}
> >
> > 
> >
> >
> > 
> >   junit
> >   junit
> >   4

Re: Crawler and Scraper with different priorities

2014-09-09 Thread Peng Cheng
Hi Sandeep,

would you be interesting in joining my open source project?

https://github.com/tribbloid/spookystuff

IMHO spark is indeed not for general purpose crawling, of which distributed
job is highly homogeneous. But good enough for directional scraping which
involves heterogeneous input and deep graph following & extraction. Please
drop me a line if you have a user case, as I'll try to integrate it as a
feature.

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Crawler-Scraper-with-different-priorities-tp13645p13838.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming into HBase

2014-09-03 Thread Kevin Peng
Ted,

Here is the full stack trace coming from spark-shell:

14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming
job 1409786463000 ms.0

org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)

at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

at akka.actor.ActorCell.invoke(ActorCell.scala:456)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Basically, what I am doing on the terminal where I run nc -lk, I type in
words separated by commas and hit enter i.e. "bill,ted".


On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu  wrote:

> Adding back user@
>
> I am not familiar with the NotSerializableException. Can you show the
> full stack trace ?
>
> See SPARK-1297 for changes you need to make so that Spark works with
> hbase 0.98
>
> Cheers
>
>
> On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng  wrote:
>
>> Ted,
>>
>> The hbase-site.xml is in the classpath (had worse issues before... until
>> I figured that it wasn't in the path).
>>
>> I get the following error in the spark-shell:
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> not serializable: java.io.NotSerializableException:
>> org.apache.spark.streaming.StreamingContext
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
>> ...
>>
>> I also double checked the hbase table, just in case, and nothing new is
>> written in there.
>>
>> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
>> CDH5.1.0 distro.
>>
>> Thank you for the help.
>>
>>
>> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu  wrote:
>>
>>> Is hbase-site.xml in the classpath ?
>>> Do you observe any exception from the code below or in region server log
>>> ?
>>>
>>> Which hbase release are you using ?
>>>
>>>
>>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1  wrote:
>>>
>>>> I have been trying to understand how spark streaming and hbase connect,
>>>> but
>>>> have not been successful. What I am trying to do is given a spark
>>>> stream,
>>>> process that stream and store the results in an hbase table. So far
>>>> this is
>>>> what I have:
>>>>
>>>> import org.apache.spark.SparkConf
>>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>>> import org.apache.spark.streaming.StreamingContext._
>>>> import org.apache.spark.storage.StorageLevel
>>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
>>>> import org.apache.hadoop.hbase.util.Bytes
>>>>
>>>> def blah(row: Array[String]) {
>>>>   val hConf = new HBaseConfiguration()
>>>>   val hTable = new HTable(hConf, "table")
>>>>   val thePut = new Put(Bytes.toBytes(row(0)))
>>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
>>>> Bytes.toBytes(row(0)))
>>>>   hTable

Re: Bug or feature? Overwrite broadcasted variables.

2014-08-19 Thread Peng Cheng
Unfortunately, After some research I found its just a side effect of how
closure containing var works in scala:
http://stackoverflow.com/questions/11657676/how-does-scala-maintains-the-values-of-variable-when-the-closure-was-defined

the closure keep referring var broadcasted wrapper as a pointer, until it is
shipped to nodes, which is only triggered lazily. So, you can't do this
after shipping already started (e.g. change the broadcasted value in a new
thread when an action is running). It's neither a feature or bug, just an
illusion.

I would really like to see a non-blocking Broadcast.set() being implemented,
it makes a lot of stochastic algorithms easier to write.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315p12382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug or feature? Overwrite broadcasted variables.

2014-08-18 Thread Peng Cheng
Yeah, Thanks a lot. I know for people understanding lazy execution this seems
straightforward. But for those who don't it may become a liability.

I've only tested its stability on a small example (which seems stable),
hopefully it's not a serendipity. Can a committer confirm this?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315p12348.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Bug or feature? Overwrite broadcasted variables.

2014-08-18 Thread Peng Cheng
I'm curious to see that if you declare broadcasted wrapper as a var, and
overwrite it in the driver program, the modification can have stable impact
on all transformations/actions defined BEFORE the overwrite but was executed
lazily AFTER the overwrite:

   val a = sc.parallelize(1 to 10)

var broadcasted = sc.broadcast("broad")

val b = a.map(_ + broadcasted.value)
//  b.persist()
for (line <- b.collect()) {  print(line)  }

println("\n===")
broadcasted = sc.broadcast("cast")

for (line <- b.collect()) {  print(line)  }

the result is:

1broad2broad3broad4broad5broad6broad7broad8broad9broad10broad
===
1cast2cast3cast4cast5cast6cast7cast8cast9cast10cast

Of course, if you persist b before overwriting it will still get the
non-surprising result (both are 10broad... because they are persisted). This
can be useful sometimes but may cause confusion at other times (people can
no longer add persist at will just for backup because it may change the
result).

So far I've found no documentation supporting this feature. So can some one
confirm that its a feature craftly designed?

Yours Peng 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >