[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-15 Thread linbojin
Github user linbojin closed the pull request at:

https://github.com/apache/spark/pull/16276


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-15 Thread linbojin
Github user linbojin commented on the issue:

https://github.com/apache/spark/pull/16276
  
@rxin Thx, I just deep dive into spark, I hope i can try to contribute more 
impactful things later 😄 . I will close this one. @srowen I tried out your 
idea that i created `TraversableRDDFunctions` file and `implicit def 
rddToTraversableRDDFunctions[U](rdd: RDD[TraversableRDDFunctions[U]])` inside 
RDD object. It's not very complex, but the problem is class RDD is 
**invariant** so that it's hard to make this method generic.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-15 Thread linbojin
Github user linbojin commented on the issue:

https://github.com/apache/spark/pull/16276
  
@srowen So any problems with my current implementation, i mean use 
`implicit asTraversable: T => TraversableOnce[U]`. Because I refer to `flatten` 
implementation in scala source code: 
https://github.com/scala/scala/blob/05016d9035ab9b1c866bd9f12fdd0491f1ea0cbb/src/library/scala/collection/generic/GenericTraversableTemplate.scala#L169
and if user call flatten on invalid rdd, it will just error out:
```
scala> val rdd = sc.parallelize(Seq(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at 
parallelize at :24

scala> rdd.flatten
:27: error: No implicit view available from Int => 
scala.collection.TraversableOnce[U].
   rdd.flatten
   ^
```
same as scala
```
scala> val l = List(1,2,3)
l: List[Int] = List(1, 2, 3)

scala> l.flatten
:13: error: No implicit view available from Int => 
scala.collection.GenTraversableOnce[B].
   l.flatten
 ^
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-14 Thread linbojin
Github user linbojin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16276#discussion_r92550699
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+*  Return a new RDD by flattening all elements from RDD with 
traversable elements
+*/
+  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
--- End diff --

@srowen I think i figured out a simpler way:
```
  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
new MapPartitionsRDD[U, T](this, (context, pid, iter) => {
  var newIter: Iterator[U] = Iterator.empty
  for (x <- iter) newIter ++= asTraversable(x)
  newIter
})
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-14 Thread linbojin
Github user linbojin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16276#discussion_r92546374
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+*  Return a new RDD by flattening all elements from RDD with 
traversable elements
+*/
+  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
--- End diff --

Hi @srowen, thx for your suggestion. I have one way to use scala flatMap as 
follows:
```
  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
val f = (x: T) => asTraversable(x)
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => 
iter.flatMap(f))
  }
```
Or i implement the logic by myself:
```
  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
new MapPartitionsRDD[U, T](this, (context, pid, iter) => new 
Iterator[U] {
  private val empty = Iterator.empty
  private var cur: Iterator[U] = empty
  private def nextCur() { cur = asTraversable(iter.next).toIterator }
  def hasNext: Boolean = {
while (!cur.hasNext) {
  if (!iter.hasNext) return false
  nextCur()
}
true
  }
  def next(): U = (if (hasNext) cur else empty).next()
})
  }
```
ref: 
https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/Iterator.scala#L432

Which one do you think is better?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-14 Thread linbojin
Github user linbojin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16276#discussion_r92531327
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -88,6 +88,13 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 }
   }
 
+  test("flatten") {
+val nums = sc.makeRDD(Array(Array(1, 2, 3), Array(4, 5), Array(6)), 2)
+assert(nums.flatten.collect().toList === List(1, 2, 3, 4, 5, 6))
+val strs = sc.makeRDD(Array(Array("a", "b", "c"), Array("d", "e"), 
Array("f")), 2)
+assert(strs.flatten.collect().toList === List("a", "b", "c", "d", "e", 
"f"))
--- End diff --

Thx, i will move test codes into "basic operations".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-14 Thread linbojin
Github user linbojin commented on the issue:

https://github.com/apache/spark/pull/16276
  
cc @rxin


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-13 Thread linbojin
GitHub user linbojin opened a pull request:

https://github.com/apache/spark/pull/16276

[SPARK-18855][CORE] Add RDD flatten function

## What changes were proposed in this pull request?

Added a new flatten function for RDD.

## How was this patch tested?

Unit tests inside RDDSuite and manually tests:
```
scala> val rdd = sc.makeRDD(List(List(1, 2, 3), List(4, 5), List(6)))
rdd: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[0] at 
makeRDD at :24

scala> rdd.flatten.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/linbojin/spark SPARK-18855-add-rdd-flatten

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16276


commit 2c0903ac07367cf203e4b1ed6bf4ac1894976ec9
Author: linbojin <linbojin...@gmail.com>
Date:   2016-12-14T06:04:48Z

add RDD flatten function and tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16132: [MINOR] [README] Correct Markdown link inside rea...

2016-12-03 Thread linbojin
GitHub user linbojin opened a pull request:

https://github.com/apache/spark/pull/16132

[MINOR] [README] Correct Markdown link inside readme

## What changes were proposed in this pull request?

"Useful Developer Tools" link inside 
[README.md](https://github.com/apache/spark/blob/master/README.md#building-spark)
 doesn't work on master branch. This pr corrects this Markdown link. 

## How was this patch tested?


[README.md](https://github.com/linbojin/spark/blob/fix-markdown-link-in-readme/README.md#building-spark)
 on this branch

![image](https://cloud.githubusercontent.com/assets/5894707/20864124/4c83499e-ba1e-11e6-9948-07b4627f516f.png)

@srowen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/linbojin/spark fix-markdown-link-in-readme

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16132


commit a6c6dc75a6bd2b05f0619f5677b770d036487c87
Author: linbojin <linbojin...@gmail.com>
Date:   2016-12-04T04:30:35Z

correct Markdown link inside README




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14645: [MINOR] [DOC] Correct code snippet results in qui...

2016-08-15 Thread linbojin
Github user linbojin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14645#discussion_r74737989
  
--- Diff: docs/quick-start.md ---
@@ -40,7 +40,7 @@ RDDs have _[actions](programming-guide.html#actions)_, 
which return values, and
 
 {% highlight scala %}
 scala> textFile.count() // Number of items in this RDD
-res0: Long = 126
+res0: Long = 99
--- End diff --

OK, i changed into 15.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14645: [MINOR] [DOC] Correct code snippet results in qui...

2016-08-15 Thread linbojin
Github user linbojin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14645#discussion_r74737117
  
--- Diff: docs/quick-start.md ---
@@ -40,7 +40,7 @@ RDDs have _[actions](programming-guide.html#actions)_, 
which return values, and
 
 {% highlight scala %}
 scala> textFile.count() // Number of items in this RDD
-res0: Long = 126
+res0: Long = 99
--- End diff --

From http://spark.apache.org/docs/latest/quick-start.html
without cache
![screen shot 2016-08-15 at 17 12 
33](https://cloud.githubusercontent.com/assets/5894707/17660363/8693aeea-630b-11e6-93b2-09c438e8f77f.png)
and with cache
![screen shot 2016-08-15 at 17 12 
44](https://cloud.githubusercontent.com/assets/5894707/17660387/a58635ac-630b-11e6-8541-36003860ed5e.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14645: [MINOR] [DOC] Correct code snippet results in qui...

2016-08-15 Thread linbojin
Github user linbojin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14645#discussion_r74733368
  
--- Diff: docs/quick-start.md ---
@@ -40,7 +40,7 @@ RDDs have _[actions](programming-guide.html#actions)_, 
which return values, and
 
 {% highlight scala %}
 scala> textFile.count() // Number of items in this RDD
-res0: Long = 126
+res0: Long = 99
--- End diff --

Ok, i will just comment these. But for the bug mentioned above (with and 
without cache, the results are not matched), do i need to fix? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14645: [MINOR] [DOC] Correct code snippet results in qui...

2016-08-15 Thread linbojin
GitHub user linbojin opened a pull request:

https://github.com/apache/spark/pull/14645

[MINOR] [DOC] Correct code snippet results in quick start documentation

## What changes were proposed in this pull request?

As README.md file is updated over time. Some code snippet outputs are not 
correct based on new README.md file. For example:
```
scala> textFile.count()
res0: Long = 126
```
should be
```
scala> textFile.count()
res0: Long = 99
```
This pr is to correct these outputs so that new spark learners have a 
correct reference.
Also, fixed a samll bug, inside current documentation, the outputs of 
linesWithSpark.count() without and with cache are different (one is 15 and the 
other is 19)
```
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at 
filter at :27

scala> textFile.filter(line => line.contains("Spark")).count() // How many 
lines contain "Spark"?
res3: Long = 15

...

scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at :27

scala> linesWithSpark.count()
res8: Long = 19
```

## How was this patch tested?

manual test:  run `$ SKIP_API=1 jekyll serve --watch`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/linbojin/spark quick-start-documentation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14645.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14645


commit f093e3a44a6447f619edd987bf30ee838899c578
Author: linbojin <linbojin...@gmail.com>
Date:   2016-08-15T06:26:39Z

correct result numbers inside quick start docs based on new README.md file




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [MINOR][SQL][DOCS] Add notes of the determinis...

2016-05-23 Thread linbojin
Github user linbojin commented on the pull request:

https://github.com/apache/spark/pull/13087#issuecomment-221148743
  
@marmbrus @dongjoon-hyun I will add the detail description to the old 
SPARK-15282 JIRA issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15282][SQL] PushDownPredicate should no...

2016-05-22 Thread linbojin
Github user linbojin commented on the pull request:

https://github.com/apache/spark/pull/13087#issuecomment-220837854
  
@dongjoon-hyun @cloud-fan @marmbrus Thanks for your discussions about my 
reported issue: [SPARK-15282](https://issues.apache.org/jira/browse/SPARK-15282)

Maybe I should describe more about our use cases. In our project, firstly 
we generated a dataframe with one column called "fileName" one column called 
"url", and then we use a udf function (used inside withColumn()) to download 
the files from the corresponding urls and filter out '{}' data before writing 
to hdfs:

```scala
// df: DataFrame["fileName", "url"] 
val getDataUDF = udf((url: String) => {
try { 
   download data
} catch { case e: Exception =>
  "{}"
}
  })
val df2 = df.withColumn("data", getDataUDF(df("url")))
.filter("data <> '{}'")
df2.write.save("hdfs path")
```

Based on our logs, each file will be downloaded twice. As for the running 
time, the writing job with filter will be twice as without filter: 
![screen shot 2016-05-22 at 22 19 
24](https://cloud.githubusercontent.com/assets/5894707/15454461/c13c9918-206b-11e6-8901-1f473fbae3ca.png)
![screen shot 2016-05-22 at 22 18 
02](https://cloud.githubusercontent.com/assets/5894707/15454462/c8d4c1a0-206b-11e6-88cc-8ad91d121b6e.png)
Left is with `.filter("data <> '{}'")` and right is without `.filter("data 
<> '{}'")`. It can be imaged, when there are many urls or the files are very 
large, the reported issue will affect the performance a lot.

Another problem is about data correctness. Because it's downloaded twice 
for each file, we came across some cases that the first downloading 
(getDataUDF) can get data (not '{}'), and the second downloading return '{}' 
because of certain connection exception. But i found the filter only worked on 
the first returned value so that spark will not remove this row but the value 
inside "data" column was '{}' which is the second returned value. Even after 
filter, we get the result dataframe df2 like the follows (files with '{}' data 
which should be removed):
```
fileName url  data
file1url1 sth 
filesurl2 `{}`
```

**So on the high level, we get '{}' data after filter out '{}' which is 
strange. The reason I think is that UDF function is executed twice when filter 
on new column created by withColumn, and two returned values are different: 
first one makes filter condition true and second one makes filter condition 
false. The dataframe will keep the second value which in fact should not appear 
after filter operation.** 

Finally, i removed the filter operation (filter out '{}'  in downstream) 
because i think it may be not correct to  filter on new column created by 
withColumn. For me, i agree with @cloud-fan and @thunterdb, we can just 
document this behavior of udfs and uses should avoid to use udfs in such way. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15282][SQL] PushDownPredicate should no...

2016-05-19 Thread linbojin
Github user linbojin commented on the pull request:

https://github.com/apache/spark/pull/13087#issuecomment-220502400
  
Hi, @marmbrus
Could you review this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org