[ANNOUNCE] Apache Kyuubi released 1.9.0

2024-03-18 Thread Binjie Yang
Hi all,

The Apache Kyuubi community is pleased to announce that
Apache Kyuubi 1.9.0 has been released!

Apache Kyuubi is a distributed and multi-tenant gateway to provide
serverless SQL on data warehouses and lakehouses.

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and lakehouses.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark at the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.9.0.html

To learn more about Apache Kyuubi, please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community
who made this release possible!

Thanks,
On behalf of Apache Kyuubi community


Fwd: the life cycle shuffle Dependency

2023-12-27 Thread yang chen
hi, I'm learning spark, and wonder when to delete shuffle data, I find the
ContextCleaner class which clean the shuffle data when shuffle dependency
is GC-ed.  Based on source code, the shuffle dependency is gc-ed only when
active job finish, but i'm not sure,  Could you explain the life cycle of a
Shuffle-Dependency?


Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi Raghavendra,

Yes, we are trying to reduce the number of files in delta as well (the
small file problem [0][1]).

We already have a scheduled app to compact files, but the number of
files is still large, at 14K files per day.

[0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
[1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/

On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
 wrote:
>
> Hi,
> What is the purpose for which you want to use repartition() .. to reduce the 
> number of files in delta?
> Also note that there is an alternative option of using coalesce() instead of 
> repartition().
> --
> Raghavendra
>
>
> On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong 
>  wrote:
>>
>> Hi all on user@spark:
>>
>> We are looking for advice and suggestions on how to tune the
>> .repartition() parameter.
>>
>> We are using Spark Streaming on our data pipeline to consume messages
>> and persist them to a Delta Lake
>> (https://delta.io/learn/getting-started/).
>>
>> We read messages from a Kafka topic, then add a generated date column
>> as a daily partitioning, and save these records to Delta Lake. We have
>> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
>> (so 4 Kafka partitions per executor).
>>
>> How then, should we use .repartition()? Should we omit this parameter?
>> Or set it to 15? or 4?
>>
>> Our code looks roughly like the below:
>>
>> ```
>> df = (
>> spark.readStream.format("kafka")
>> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
>> .option("subscribe", os.environ["KAFKA_TOPIC"])
>> .load()
>> )
>>
>> table = (
>> df.select(
>> from_protobuf(
>> "value", "table", "/opt/protobuf-desc/table.desc"
>> ).alias("msg")
>> )
>> .withColumn("uuid", col("msg.uuid"))
>> # etc other columns...
>>
>> # generated column for daily partitioning in Delta Lake
>> .withColumn(CREATED_DATE,
>> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
>> .drop("msg")
>> )
>>
>> query = (
>> table
>> .repartition(10).writeStream
>> .queryName(APP_NAME)
>> .outputMode("append")
>> .format("delta")
>> .partitionBy(CREATED_DATE)
>> .option("checkpointLocation", os.environ["CHECKPOINT"])
>> .start(os.environ["DELTA_PATH"])
>> )
>>
>> query.awaitTermination()
>> spark.stop()
>> ```
>>
>> Any advice would be appreciated.
>>
>> --
>> Best Regards,
>> Shao Yang HONG
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


-- 
Best Regards,
Shao Yang HONG
Software Engineer, Pricing, Tech

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Write Spark Connection client application in Go

2023-09-14 Thread bo yang
Thanks Holden and Martin for the nice words and feedback :)

On Wed, Sep 13, 2023 at 8:22 AM Martin Grund  wrote:

> This is absolutely awesome! Thank you so much for dedicating your time to
> this project!
>
>
> On Wed, Sep 13, 2023 at 6:04 AM Holden Karau  wrote:
>
>> That’s so cool! Great work y’all :)
>>
>> On Tue, Sep 12, 2023 at 8:14 PM bo yang  wrote:
>>
>>> Hi Spark Friends,
>>>
>>> Anyone interested in using Golang to write Spark application? We created
>>> a Spark Connect Go Client library
>>> <https://github.com/apache/spark-connect-go>. Would love to hear
>>> feedback/thoughts from the community.
>>>
>>> Please see the quick start guide
>>> <https://github.com/apache/spark-connect-go/blob/master/quick-start.md>
>>> about how to use it. Following is a very short Spark Connect application in
>>> Go:
>>>
>>> func main() {
>>> spark, _ := 
>>> sql.SparkSession.Builder.Remote("sc://localhost:15002").Build()
>>> defer spark.Stop()
>>>
>>> df, _ := spark.Sql("select 'apple' as word, 123 as count union all 
>>> select 'orange' as word, 456 as count")
>>> df.Show(100, false)
>>> df.Collect()
>>>
>>> df.Write().Mode("overwrite").
>>> Format("parquet").
>>> Save("file:///tmp/spark-connect-write-example-output.parquet")
>>>
>>> df = spark.Read().Format("parquet").
>>> Load("file:///tmp/spark-connect-write-example-output.parquet")
>>> df.Show(100, false)
>>>
>>> df.CreateTempView("view1", true, false)
>>> df, _ = spark.Sql("select count, word from view1 order by count")
>>> }
>>>
>>>
>>> Many thanks to Martin, Hyukjin, Ruifeng and Denny for creating and
>>> working together on this repo! Welcome more people to contribute :)
>>>
>>> Best,
>>> Bo
>>>
>>>


Write Spark Connection client application in Go

2023-09-12 Thread bo yang
Hi Spark Friends,

Anyone interested in using Golang to write Spark application? We
created a Spark
Connect Go Client library .
Would love to hear feedback/thoughts from the community.

Please see the quick start guide

about how to use it. Following is a very short Spark Connect application in
Go:

func main() {
spark, _ := 
sql.SparkSession.Builder.Remote("sc://localhost:15002").Build()
defer spark.Stop()

df, _ := spark.Sql("select 'apple' as word, 123 as count union all
select 'orange' as word, 456 as count")
df.Show(100, false)
df.Collect()

df.Write().Mode("overwrite").
Format("parquet").
Save("file:///tmp/spark-connect-write-example-output.parquet")

df = spark.Read().Format("parquet").
Load("file:///tmp/spark-connect-write-example-output.parquet")
df.Show(100, false)

df.CreateTempView("view1", true, false)
df, _ = spark.Sql("select count, word from view1 order by count")
}


Many thanks to Martin, Hyukjin, Ruifeng and Denny for creating and working
together on this repo! Welcome more people to contribute :)

Best,
Bo


Re: [ANNOUNCE] Apache Spark 3.2.3 released

2022-11-30 Thread Yang,Jie(INF)
Thanks, Chao!

发件人: Maxim Gekk 
日期: 2022年11月30日 星期三 19:40
收件人: Jungtaek Lim 
抄送: Wenchen Fan , Chao Sun , dev 
, user 
主题: Re: [ANNOUNCE] Apache Spark 3.2.3 released

Thank you, Chao!

On Wed, Nov 30, 2022 at 12:42 PM Jungtaek Lim 
mailto:kabhwan.opensou...@gmail.com>> wrote:
Thanks Chao for driving the release!

On Wed, Nov 30, 2022 at 6:03 PM Wenchen Fan 
mailto:cloud0...@gmail.com>> wrote:
Thanks, Chao!

On Wed, Nov 30, 2022 at 1:33 AM Chao Sun 
mailto:sunc...@apache.org>> wrote:
We are happy to announce the availability of Apache Spark 3.2.3!

Spark 3.2.3 is a maintenance release containing stability fixes. This
release is based on the branch-3.2 maintenance branch of Spark. We strongly
recommend all 3.2 users to upgrade to this stable release.

To download Spark 3.2.3, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-2-3.html

We would like to acknowledge all community members for contributing to this
release. This release would not have been possible without you.

Chao

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org


Re: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-03 Thread bo yang
Interesting discussion here, looks like Spark does not support configuring
different number of executors in different stages. Would love to see the
community come out such a feature.

On Thu, Nov 3, 2022 at 9:10 AM Shay Elbaz  wrote:

> Thanks again Artemis, I really appreciate it. I have watched the video
> but did not find an answer.
>
> Please bear with me just one more iteration 
>
> Maybe I'll be more specific:
> Suppose I start the application with maxExecutors=500, executors.cores=2,
> because that's the amount of resources needed for the ETL part. But for the
> DL part I only need 20 GPUs. SLS API only allows to set the resources per
> executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I
> configure the profile with 1 GPU per executor.
> So, the question is how do I limit the stage resources to 20 GPUs total?
>
> Thanks again,
> Shay
>
> --
> *From:* Artemis User 
> *Sent:* Thursday, November 3, 2022 5:23 PM
> *To:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Re: Stage level scheduling - lower the number
> of executors when using GPUs
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>   Shay,  You may find this video helpful (with some API code samples that
> you are looking for).  https://www.youtube.com/watch?v=JNQu-226wUc=171s.
> The issue here isn't how to limit the number of executors but to request
> for the right GPU-enabled executors dynamically.  Those executors used in
> pre-GPU stages should be returned back to resource managers with dynamic
> resource allocation enabled (and with the right DRA policies).  Hope this
> helps..
>
> Unfortunately there isn't a lot of detailed docs for this topic since GPU
> acceleration is kind of new in Spark (not straightforward like in TF).   I
> wish the Spark doc team could provide more details in the next release...
>
> On 11/3/22 2:37 AM, Shay Elbaz wrote:
>
> Thanks Artemis. We are *not* using Rapids, but rather using GPUs through
> the Stage Level Scheduling feature with ResourceProfile. In Kubernetes
> you have to turn on shuffle tracking for dynamic allocation, anyhow.
> The question is how we can limit the *number of executors *when building
> a new ResourceProfile, directly (API) or indirectly (some advanced
> workaround).
>
> Thanks,
> Shay
>
>
> --
> *From:* Artemis User  
> *Sent:* Thursday, November 3, 2022 1:16 AM
> *To:* user@spark.apache.org 
> 
> *Subject:* [EXTERNAL] Re: Stage level scheduling - lower the number of
> executors when using GPUs
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>   Are you using Rapids for GPU support in Spark?  Couple of options you
> may want to try:
>
>1. In addition to dynamic allocation turned on, you may also need to
>turn on external shuffling service.
>2. Sounds like you are using Kubernetes.  In that case, you may also
>need to turn on shuffle tracking.
>3. The "stages" are controlled by the APIs.  The APIs for dynamic
>resource request (change of stage) do exist, but only for RDDs (e.g.
>TaskResourceRequest and ExecutorResourceRequest).
>
>
> On 11/2/22 11:30 AM, Shay Elbaz wrote:
>
> Hi,
>
> Our typical applications need less *executors* for a GPU stage than for a
> CPU stage. We are using dynamic allocation with stage level scheduling, and
> Spark tries to maximize the number of executors also during the GPU stage,
> causing a bit of resources chaos in the cluster. This forces us to use a
> lower value for 'maxExecutors' in the first place, at the cost of the CPU
> stages performance. Or try to solve this in the Kubernets scheduler level,
> which is not straightforward and doesn't feel like the right way to go.
>
> Is there a way to effectively use less executors in Stage Level
> Scheduling? The API does not seem to include such an option, but maybe
> there is some more advanced workaround?
>
> Thanks,
> Shay
>
>
>
>
>
>
>
>


Re: [ANNOUNCE] Apache Spark 3.3.1 released

2022-10-26 Thread Yang,Jie(INF)
Thanks Yuming and all developers ~

Yang Jie

发件人: Maxim Gekk 
日期: 2022年10月26日 星期三 15:19
收件人: Hyukjin Kwon 
抄送: "L. C. Hsieh" , Dongjoon Hyun , 
Yuming Wang , dev , User 

主题: Re: [ANNOUNCE] Apache Spark 3.3.1 released

Congratulations everyone with the new release, and thanks to Yuming for his 
efforts.

Maxim Gekk

Software Engineer

Databricks, Inc.


On Wed, Oct 26, 2022 at 10:14 AM Hyukjin Kwon 
mailto:gurwls...@gmail.com>> wrote:
Thanks, Yuming.

On Wed, 26 Oct 2022 at 16:01, L. C. Hsieh 
mailto:vii...@gmail.com>> wrote:
Thank you for driving the release of Apache Spark 3.3.1, Yuming!

On Tue, Oct 25, 2022 at 11:38 PM Dongjoon Hyun 
mailto:dongjoon.h...@gmail.com>> wrote:
>
> It's great. Thank you so much, Yuming!
>
> Dongjoon
>
> On Tue, Oct 25, 2022 at 11:23 PM Yuming Wang 
> mailto:wgy...@gmail.com>> wrote:
>>
>> We are happy to announce the availability of Apache Spark 3.3.1!
>>
>> Spark 3.3.1 is a maintenance release containing stability fixes. This
>> release is based on the branch-3.3 maintenance branch of Spark. We strongly
>> recommend all 3.3 users to upgrade to this stable release.
>>
>> To download Spark 3.3.1, head over to the download page:
>> https://spark.apache.org/downloads.html<https://mailshield.baidu.com/check?q=aV5QpxMQ4pApHhycByY17SDpg%2fyWowLsFKuT2QIJ%2blgKNmM8ZTuo%2bh%2bxuQw%3d>
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-3-3-1.html<https://mailshield.baidu.com/check?q=odnqSnhAst%2f17YyLyvABKRF0EYlGZhrVVtQBxQnnxsymz4ic%2fZ7tbqk6uBYU5ISzY%2fTdmOGgQUWZ6QYviNnuCQ%3d%3d>
>>
>> We would like to acknowledge all community members for contributing to this
>> release. This release would not have been possible without you.
>>
>>

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>


Re: [Java 17] --add-exports required?

2022-06-23 Thread Yang,Jie(INF)
So the above issue occurs at build and test a maven project with Spark 3.3.0 
and Java 17, rather than test spark-3.3 source code?

If yes, you may need to add the following Java Options to `argLine` of 
`maven-surefire-plugin` for Java 17:

```
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
```

These options are used to pass all Spark UTs, but maybe you don't need all.

However, these Options needn't explicit add when using spark-shell, spark-sql 
and spark-submit, but may need to add others as needed for Java 17.

Maybe some instructions should be added to the document

Yang Jie





发件人: Greg Kopff 
日期: 2022年6月23日 星期四 14:11
收件人: "Yang,Jie(INF)" 
抄送: "user@spark.apache.org" 
主题: Re: [Java 17] --add-exports required?

Hi.

I am running on macOS 12.4, using an ‘Adoptium’ JDK from 
https://adoptium.net/download<https://mailshield.baidu.com/check?q=U8F1V2tHFnSLZMX%2fpIYOpCo623EkCAJTvS41G4mer6y1V2iN>.
 The version details are:

$ java -version
openjdk version "17.0.3" 2022-04-19
OpenJDK Runtime Environment Temurin-17.0.3+7 (build 17.0.3+7)
OpenJDK 64-Bit Server VM Temurin-17.0.3+7 (build 17.0.3+7, mixed mode, sharing)
I have attached an example maven project which demonstrates the error.



If you run 'mvn clean test' it should fail with:

[ERROR] ExampleTest  Time elapsed: 1.194 s  <<< ERROR!
java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ (in 
unnamed module @0x41a962cf) cannot access class sun.nio.ch.DirectBuffer (in 
module java.base) because module java.base does not export 
sun.nio.ch<http://sun.nio.ch> to unnamed module @0x41a962cf

Some of the diagnostic output from running with Maven with the -X flag is:

Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /usr/local/apache-maven/apache-maven-3.8.6
Java version: 17.0.3, vendor: Eclipse Adoptium, runtime: 
/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home
Default locale: en_AU, platform encoding: UTF-8
OS name: "mac os x", version: "12.4", arch: "x86_64", family: “mac"

[DEBUG] boot(compact) classpath:  surefire-booter-3.0.0-M7.jar  
surefire-api-3.0.0-M7.jar  surefire-logger-api-3.0.0-M7.jar  
surefire-shared-utils-3.0.0-M7.jar  surefire-extensions-spi-3.0.0-M7.jar  
test-classes  classes  junit-4.13.2.jar  hamcrest-core-1.3.jar  
hamcrest-all-1.3.jar  spark-core_2.12-3.3.0.jar  avro-1.11.0.jar  
jackson-core-2.12.5.jar  commons-compress-1.21.jar  avro-mapred-1.11.0.jar  
avro-ipc-1.11.0.jar  xz-1.9.jar  chill_2.12-0.10.0.jar  kryo-shaded-4.0.2.jar  
minlog-1.3.0.jar  objenesis-2.5.1.jar  chill-java-0.10.0.jar  
xbean-asm9-shaded-4.20.jar  hadoop-client-api-3.3.2.jar  
hadoop-client-runtime-3.3.2.jar  commons-logging-1.1.3.jar  
spark-launcher_2.12-3.3.0.jar  spark-kvstore_2.12-3.3.0.jar  
leveldbjni-all-1.8.jar  jackson-annotations-2.13.3.jar  
spark-network-common_2.12-3.3.0.jar  tink-1.6.1.jar  gson-2.8.6.jar  
spark-network-shuffle_2.12-3.3.0.jar  spark-unsafe_2.12-3.3.0.jar  
activation-1.1.1.jar  curator-recipes-2.13.0.jar  curator-framework-2.13.0.jar  
curator-client-2.13.0.jar  guava-16.0.1.jar  zookeeper-3.6.2.jar  
commons-lang-2.6.jar  zookeeper-jute-3.6.2.jar  audience-annotations-0.5.0.jar  
jakarta.servlet-api-4.0.3.jar  commons-codec-1.15.jar  commons-lang3-3.12.0.jar 
 commons-math3-3.6.1.jar  commons-text-1.9.jar  commons-io-2.11.0.jar  
commons-collections-3.2.2.jar  commons-collections4-4.4.jar  jsr305-3.0.0.jar  
slf4j-api-1.7.32.jar  jul-to-slf4j-1.7.32.jar  jcl-over-slf4j-1.7.32.jar  
log4j-slf4j-impl-2.17.2.jar  log4j-api-2.17.2.jar  log4j-core-2.17.2.jar  
log4j-1.2-api-2.17.2.jar  compress-lzf-1.1.jar  snappy-java-1.1.8.4.jar  
lz4-java-1.8.0.jar  zstd-jni-1.5.2-1.jar  RoaringBitmap-0.9.25.jar  
shims-0.9.25.jar  scala-xml_2.12-1.2.0.jar  scala-library-2.12.15.jar  
scala-reflect-2.12.15.jar  json4s-jackson_2.12-3.7.0-M11.jar  
json4s-core_2.12-3.7.0-M11.jar  json4s-ast_2.12-3.7.0-M11.jar  
json4s-scalap_2.12-3.7.0-M11.jar  jersey-client-2.34.jar  
jakarta.ws.rs<http://jakarta.ws.rs>-api-2.1.6.jar  jakarta.inject-2.6.1.jar  
jersey-common-2.34.jar  jakarta.annotation-api-1.3.5.jar  
osgi-resource-locator-1.0.3.jar  jersey-server-2.34.jar  
jakarta.validation-api-2.0.2.jar  jersey-container-servlet-2.34.jar  
jersey-container-servlet-core-2.34.jar  jersey-hk2-2.34.jar  
hk2-locator-2.6.1.jar  aopall

Re: [Java 17] --add-exports required?

2022-06-22 Thread Yang,Jie(INF)
Hi, Greg

"--add-exports java.base/sun.nio.ch=ALL-UNNAMED " does not need to be added 
when SPARK-33772 is completed, so in order to answer your question, I need more 
details for testing:
1.  Where can I download Java 17 (Temurin-17+35)?
2.  What test commands do you use?

Yang Jie

在 2022/6/23 12:54,“Greg Kopff” 写入:

Hi.

According to the release notes[1], and specifically the ticket Build and 
Run Spark on Java 17 (SPARK-33772)[2], Spark now supports running on Java 17.

However, using Java 17 (Temurin-17+35) with Maven (3.8.6) and 
maven-surefire-plugin (3.0.0-M7), when running a unit test that uses Spark 
(3.3.0), it fails with:

java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ 
(in unnamed module @0x1e7ba8d9) cannot access class sun.nio.ch.DirectBuffer (in 
module java.base) because module java.base does not export sun.nio.ch to 
unnamed module @0x1e7ba8d9

The full stack is:

java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ 
(in unnamed module @0x1e7ba8d9) cannot access class sun.nio.ch.DirectBuffer (in 
module java.base) because module java.base does not export sun.nio.ch to 
unnamed module @0x1e7ba8d9
  at org.apache.spark.storage.StorageUtils$.(StorageUtils.scala:213)
  at org.apache.spark.storage.StorageUtils$.(StorageUtils.scala)
  at 
org.apache.spark.storage.BlockManagerMasterEndpoint.(BlockManagerMasterEndpoint.scala:114)
  at org.apache.spark.SparkEnv$.$anonfun$create$9(SparkEnv.scala:353)
  at 
org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:290)
  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:339)
  at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:194)
  at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:279)
  at org.apache.spark.SparkContext.(SparkContext.scala:464)
  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2704)
  at 
org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:953)
  at scala.Option.getOrElse(Option.scala:189)
  at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:947)
  […]

There is a recent StackOverflow question "Java 17 solution for Spark - 
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.spark.storage.StorageUtils"[3], which was asked only 2 months ago, 
but this pre-dated the Spark 3.3.0 release, and thus predated official support 
for Java 17.  The solution proposed there results in us adding this 
configuration to the Surefire plugin:


  --add-exports java.base/sun.nio.ch=ALL-UNNAMED


And, yes, this works.

Now, I understand what this flag achieves … without it the JVM module 
system won’t allow Spark to use the sun.nio.ch.DirectBuffer class.  My question 
is if the requirement to add this flag is currently documented somewhere?  I 
couldn’t find its and it’s likely to start affecting people when they switch to 
Java 17.  Right now the web is mostly full of suggestions to use an earlier 
version of Java.

Cheers,

—
Greg.


[1]: https://spark.apache.org/releases/spark-release-3-3-0.html
[2]: https://issues.apache.org/jira/browse/SPARK-33772
[3]: https://stackoverflow.com/questions/72230174
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: Reverse proxy for Spark UI on Kubernetes

2022-05-17 Thread bo yang
Yes, it should be possible, any interest to work on this together? Need
more hands to add more features here :)

On Tue, May 17, 2022 at 2:06 PM Holden Karau  wrote:

> Could we make it do the same sort of history server fallback approach?
>
> On Tue, May 17, 2022 at 10:41 PM bo yang  wrote:
>
>> It is like Web Application Proxy in YARN (
>> https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html),
>> to provide easy access for Spark UI when the Spark application is running.
>>
>> When running Spark on Kubernetes with S3, there is no YARN. The reverse
>> proxy here is to behave like that Web Application Proxy. It will
>> simplify settings to access Spark UI on Kubernetes.
>>
>>
>> On Mon, May 16, 2022 at 11:46 PM wilson  wrote:
>>
>>> what's the advantage of using reverse proxy for spark UI?
>>>
>>> Thanks
>>>
>>> On Tue, May 17, 2022 at 1:47 PM bo yang  wrote:
>>>
>>>> Hi Spark Folks,
>>>>
>>>> I built a web reverse proxy to access Spark UI on Kubernetes (working
>>>> together with
>>>> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator). Want to
>>>> share here in case other people have similar need.
>>>>
>>>> The reverse proxy code is here:
>>>> https://github.com/datapunchorg/spark-ui-reverse-proxy
>>>>
>>>> Let me know if anyone wants to use or would like to contribute.
>>>>
>>>> Thanks,
>>>> Bo
>>>>
>>>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: Reverse proxy for Spark UI on Kubernetes

2022-05-17 Thread bo yang
It is like Web Application Proxy in YARN (
https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html),
to provide easy access for Spark UI when the Spark application is running.

When running Spark on Kubernetes with S3, there is no YARN. The reverse
proxy here is to behave like that Web Application Proxy. It will
simplify settings to access Spark UI on Kubernetes.


On Mon, May 16, 2022 at 11:46 PM wilson  wrote:

> what's the advantage of using reverse proxy for spark UI?
>
> Thanks
>
> On Tue, May 17, 2022 at 1:47 PM bo yang  wrote:
>
>> Hi Spark Folks,
>>
>> I built a web reverse proxy to access Spark UI on Kubernetes (working
>> together with
>> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator). Want to
>> share here in case other people have similar need.
>>
>> The reverse proxy code is here:
>> https://github.com/datapunchorg/spark-ui-reverse-proxy
>>
>> Let me know if anyone wants to use or would like to contribute.
>>
>> Thanks,
>> Bo
>>
>>


Re: Reverse proxy for Spark UI on Kubernetes

2022-05-17 Thread bo yang
Thanks Holden :)

On Mon, May 16, 2022 at 11:12 PM Holden Karau  wrote:

> Oh that’s rad 
>
> On Tue, May 17, 2022 at 7:47 AM bo yang  wrote:
>
>> Hi Spark Folks,
>>
>> I built a web reverse proxy to access Spark UI on Kubernetes (working
>> together with
>> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator). Want to
>> share here in case other people have similar need.
>>
>> The reverse proxy code is here:
>> https://github.com/datapunchorg/spark-ui-reverse-proxy
>>
>> Let me know if anyone wants to use or would like to contribute.
>>
>> Thanks,
>> Bo
>>
>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Reverse proxy for Spark UI on Kubernetes

2022-05-16 Thread bo yang
Hi Spark Folks,

I built a web reverse proxy to access Spark UI on Kubernetes (working
together with https://github.com/GoogleCloudPlatform/spark-on-k8s-operator).
Want to share here in case other people have similar need.

The reverse proxy code is here:
https://github.com/datapunchorg/spark-ui-reverse-proxy

Let me know if anyone wants to use or would like to contribute.

Thanks,
Bo


Re: Spark Parquet write OOM

2022-03-01 Thread Yang,Jie(INF)
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the 
capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and 
this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  
`-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari 
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org" 
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below 
exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet 
writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for 
other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community 
for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
 at sun.misc.Unsafe.allocateMemory(Native Method)
 at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
 at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
 at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
 at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
 at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
 at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
 at 
org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
 at 
org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
 at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
 at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
 at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
 at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
 at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
 at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
 ... 10 more
 Suppressed: 

Re: One click to run Spark on Kubernetes

2022-02-23 Thread bo yang
It uses Helm to deploy Spark Operator and Nginx. For other parts like
creating EKS, IAM role, node group, etc, it uses AWS SDK to provision those
AWS resources.

On Wed, Feb 23, 2022 at 11:28 AM Bjørn Jørgensen 
wrote:

> So if I get this right you will make a Helm <https://helm.sh> chart to
> deploy Spark and some other stuff on K8S?
>
> ons. 23. feb. 2022 kl. 17:49 skrev bo yang :
>
>> Hi Sarath, let's follow up offline on this.
>>
>> On Wed, Feb 23, 2022 at 8:32 AM Sarath Annareddy <
>> sarath.annare...@gmail.com> wrote:
>>
>>> Hi bo
>>>
>>> How do we start?
>>>
>>> Is there a plan? Onboarding, Arch/design diagram, tasks lined up etc
>>>
>>>
>>> Thanks
>>> Sarath
>>>
>>>
>>> Sent from my iPhone
>>>
>>> On Feb 23, 2022, at 10:27 AM, bo yang  wrote:
>>>
>>> 
>>> Hi Sarath, thanks for your interest and willing to contribute! The
>>> project supports local development using MiniKube. Similarly there is a one
>>> click command with one extra argument to deploy all components in MiniKube,
>>> and people could use that to develop on their local MacBook.
>>>
>>>
>>> On Wed, Feb 23, 2022 at 7:41 AM Sarath Annareddy <
>>> sarath.annare...@gmail.com> wrote:
>>>
>>>> Hi bo
>>>>
>>>> I am interested to contribute.
>>>> But I don’t have free access to any cloud provider. Not sure how I can
>>>> get free access. I know Google, aws, azure only provides temp free access,
>>>> it may not be sufficient.
>>>>
>>>> Guidance is appreciated.
>>>>
>>>> Sarath
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On Feb 23, 2022, at 2:01 AM, bo yang  wrote:
>>>>
>>>> 
>>>>
>>>> Right, normally people start with simple script, then add more stuff,
>>>> like permission and more components. After some time, people want to run
>>>> the script consistently in different environments. Things will become
>>>> complex.
>>>>
>>>> That is why we want to see whether people have interest for such a "one
>>>> click" tool to make things easy.
>>>>
>>>>
>>>> On Tue, Feb 22, 2022 at 11:31 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> There are two distinct actions here; namely Deploy and Run.
>>>>>
>>>>> Deployment can be done by command line script with autoscaling. In the
>>>>> newer versions of Kubernnetes you don't even need to specify the node
>>>>> types, you can leave it to the Kubernetes cluster  to scale up and down 
>>>>> and
>>>>> decide on node type.
>>>>>
>>>>> The second point is the running spark that you will need to submit.
>>>>> However, that depends on setting up access permission, use of service
>>>>> accounts, pulling the correct dockerfiles for the driver and the 
>>>>> executors.
>>>>> Those details add to the complexity.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 23 Feb 2022 at 04:06, bo yang  wrote:
>>>>>
>>>>>> Hi Spark Community,
>>>>>>
>>>>>> We built an open source tool to deploy and run Spark on Kubernetes
>>>>>> with a one click command. For example, on AWS, it could automatically
>>>>>> create an EKS cluster, node group, NGINX ingress, and Spark Operator. 
>>>>>> Then
>>>>>> you will be able to use curl or a CLI tool to submit Spark application.
>>>>>> After the deployment, you could also install Uber Remote Shuffle Service 
>>>>>> to
>>>>>> enable Dynamic Allocation on Kuberentes.
>>>>>>
>>>>>> Anyone interested in using or working together on such a tool?
>>>>>>
>>>>>> Thanks,
>>>>>> Bo
>>>>>>
>>>>>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: One click to run Spark on Kubernetes

2022-02-23 Thread bo yang
Hi Sarath, let's follow up offline on this.

On Wed, Feb 23, 2022 at 8:32 AM Sarath Annareddy 
wrote:

> Hi bo
>
> How do we start?
>
> Is there a plan? Onboarding, Arch/design diagram, tasks lined up etc
>
>
> Thanks
> Sarath
>
>
> Sent from my iPhone
>
> On Feb 23, 2022, at 10:27 AM, bo yang  wrote:
>
> 
> Hi Sarath, thanks for your interest and willing to contribute! The project
> supports local development using MiniKube. Similarly there is a one click
> command with one extra argument to deploy all components in MiniKube, and
> people could use that to develop on their local MacBook.
>
>
> On Wed, Feb 23, 2022 at 7:41 AM Sarath Annareddy <
> sarath.annare...@gmail.com> wrote:
>
>> Hi bo
>>
>> I am interested to contribute.
>> But I don’t have free access to any cloud provider. Not sure how I can
>> get free access. I know Google, aws, azure only provides temp free access,
>> it may not be sufficient.
>>
>> Guidance is appreciated.
>>
>> Sarath
>>
>> Sent from my iPhone
>>
>> On Feb 23, 2022, at 2:01 AM, bo yang  wrote:
>>
>> 
>>
>> Right, normally people start with simple script, then add more stuff,
>> like permission and more components. After some time, people want to run
>> the script consistently in different environments. Things will become
>> complex.
>>
>> That is why we want to see whether people have interest for such a "one
>> click" tool to make things easy.
>>
>>
>> On Tue, Feb 22, 2022 at 11:31 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> There are two distinct actions here; namely Deploy and Run.
>>>
>>> Deployment can be done by command line script with autoscaling. In the
>>> newer versions of Kubernnetes you don't even need to specify the node
>>> types, you can leave it to the Kubernetes cluster  to scale up and down and
>>> decide on node type.
>>>
>>> The second point is the running spark that you will need to submit.
>>> However, that depends on setting up access permission, use of service
>>> accounts, pulling the correct dockerfiles for the driver and the executors.
>>> Those details add to the complexity.
>>>
>>> Thanks
>>>
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 23 Feb 2022 at 04:06, bo yang  wrote:
>>>
>>>> Hi Spark Community,
>>>>
>>>> We built an open source tool to deploy and run Spark on Kubernetes with
>>>> a one click command. For example, on AWS, it could automatically create an
>>>> EKS cluster, node group, NGINX ingress, and Spark Operator. Then you will
>>>> be able to use curl or a CLI tool to submit Spark application. After the
>>>> deployment, you could also install Uber Remote Shuffle Service to enable
>>>> Dynamic Allocation on Kuberentes.
>>>>
>>>> Anyone interested in using or working together on such a tool?
>>>>
>>>> Thanks,
>>>> Bo
>>>>
>>>>


Re: One click to run Spark on Kubernetes

2022-02-23 Thread bo yang
Hi Sarath, thanks for your interest and willing to contribute! The project
supports local development using MiniKube. Similarly there is a one click
command with one extra argument to deploy all components in MiniKube, and
people could use that to develop on their local MacBook.


On Wed, Feb 23, 2022 at 7:41 AM Sarath Annareddy 
wrote:

> Hi bo
>
> I am interested to contribute.
> But I don’t have free access to any cloud provider. Not sure how I can get
> free access. I know Google, aws, azure only provides temp free access, it
> may not be sufficient.
>
> Guidance is appreciated.
>
> Sarath
>
> Sent from my iPhone
>
> On Feb 23, 2022, at 2:01 AM, bo yang  wrote:
>
> 
>
> Right, normally people start with simple script, then add more stuff, like
> permission and more components. After some time, people want to run the
> script consistently in different environments. Things will become complex.
>
> That is why we want to see whether people have interest for such a "one
> click" tool to make things easy.
>
>
> On Tue, Feb 22, 2022 at 11:31 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> There are two distinct actions here; namely Deploy and Run.
>>
>> Deployment can be done by command line script with autoscaling. In the
>> newer versions of Kubernnetes you don't even need to specify the node
>> types, you can leave it to the Kubernetes cluster  to scale up and down and
>> decide on node type.
>>
>> The second point is the running spark that you will need to submit.
>> However, that depends on setting up access permission, use of service
>> accounts, pulling the correct dockerfiles for the driver and the executors.
>> Those details add to the complexity.
>>
>> Thanks
>>
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 23 Feb 2022 at 04:06, bo yang  wrote:
>>
>>> Hi Spark Community,
>>>
>>> We built an open source tool to deploy and run Spark on Kubernetes with
>>> a one click command. For example, on AWS, it could automatically create an
>>> EKS cluster, node group, NGINX ingress, and Spark Operator. Then you will
>>> be able to use curl or a CLI tool to submit Spark application. After the
>>> deployment, you could also install Uber Remote Shuffle Service to enable
>>> Dynamic Allocation on Kuberentes.
>>>
>>> Anyone interested in using or working together on such a tool?
>>>
>>> Thanks,
>>> Bo
>>>
>>>


Re: One click to run Spark on Kubernetes

2022-02-23 Thread bo yang
Right, normally people start with simple script, then add more stuff, like
permission and more components. After some time, people want to run the
script consistently in different environments. Things will become complex.

That is why we want to see whether people have interest for such a "one
click" tool to make things easy.


On Tue, Feb 22, 2022 at 11:31 PM Mich Talebzadeh 
wrote:

> Hi,
>
> There are two distinct actions here; namely Deploy and Run.
>
> Deployment can be done by command line script with autoscaling. In the
> newer versions of Kubernnetes you don't even need to specify the node
> types, you can leave it to the Kubernetes cluster  to scale up and down and
> decide on node type.
>
> The second point is the running spark that you will need to submit.
> However, that depends on setting up access permission, use of service
> accounts, pulling the correct dockerfiles for the driver and the executors.
> Those details add to the complexity.
>
> Thanks
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 23 Feb 2022 at 04:06, bo yang  wrote:
>
>> Hi Spark Community,
>>
>> We built an open source tool to deploy and run Spark on Kubernetes with a
>> one click command. For example, on AWS, it could automatically create an
>> EKS cluster, node group, NGINX ingress, and Spark Operator. Then you will
>> be able to use curl or a CLI tool to submit Spark application. After the
>> deployment, you could also install Uber Remote Shuffle Service to enable
>> Dynamic Allocation on Kuberentes.
>>
>> Anyone interested in using or working together on such a tool?
>>
>> Thanks,
>> Bo
>>
>>


Re: One click to run Spark on Kubernetes

2022-02-22 Thread bo yang
Merging another email from Prasad. It could co-exist with livy. Livy is
similar like the REST Service + Spark Operator. Unfortunately Livy is not
very active right now.

To Amihay, the link is: https://github.com/datapunchorg/punch.

On Tue, Feb 22, 2022 at 8:53 PM amihay gonen  wrote:

> Can you share link to the source?
>
> בתאריך יום ד׳, 23 בפבר׳ 2022, 6:52, מאת bo yang ‏:
>
>> We do not have SaaS yet. Now it is an open source project we build in our
>> part time , and we welcome more people working together on that.
>>
>> You could specify cluster size (EC2 instance type and number of
>> instances) and run it for 1 hour. Then you could run one click command to
>> destroy the cluster. It is possible to merge these steps as well, and
>> provide a "serverless" experience. That is in our TODO list :)
>>
>>
>> On Tue, Feb 22, 2022 at 8:36 PM Bitfox  wrote:
>>
>>> How can I specify the cluster memory and cores?
>>> For instance, I want to run a job with 16 cores and 300 GB memory for
>>> about 1 hour. Do you have the SaaS solution for this? I can pay as I did.
>>>
>>> Thanks
>>>
>>> On Wed, Feb 23, 2022 at 12:21 PM bo yang  wrote:
>>>
>>>> It is not a standalone spark cluster. In some details, it deploys a
>>>> Spark Operator (
>>>> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) and an
>>>> extra REST Service. When people submit Spark application to that REST
>>>> Service, the REST Service will create a CRD inside the Kubernetes cluster.
>>>> Then Spark Operator will pick up the CRD and launch the Spark application.
>>>> The one click tool intends to hide these details, so people could just
>>>> submit Spark and do not need to deal with too many deployment details.
>>>>
>>>> On Tue, Feb 22, 2022 at 8:09 PM Bitfox  wrote:
>>>>
>>>>> Can it be a cluster installation of spark? or just the standalone node?
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Wed, Feb 23, 2022 at 12:06 PM bo yang  wrote:
>>>>>
>>>>>> Hi Spark Community,
>>>>>>
>>>>>> We built an open source tool to deploy and run Spark on Kubernetes
>>>>>> with a one click command. For example, on AWS, it could automatically
>>>>>> create an EKS cluster, node group, NGINX ingress, and Spark Operator. 
>>>>>> Then
>>>>>> you will be able to use curl or a CLI tool to submit Spark application.
>>>>>> After the deployment, you could also install Uber Remote Shuffle Service 
>>>>>> to
>>>>>> enable Dynamic Allocation on Kuberentes.
>>>>>>
>>>>>> Anyone interested in using or working together on such a tool?
>>>>>>
>>>>>> Thanks,
>>>>>> Bo
>>>>>>
>>>>>>


Re: One click to run Spark on Kubernetes

2022-02-22 Thread bo yang
We do not have SaaS yet. Now it is an open source project we build in our
part time , and we welcome more people working together on that.

You could specify cluster size (EC2 instance type and number of instances)
and run it for 1 hour. Then you could run one click command to destroy the
cluster. It is possible to merge these steps as well, and provide a
"serverless" experience. That is in our TODO list :)


On Tue, Feb 22, 2022 at 8:36 PM Bitfox  wrote:

> How can I specify the cluster memory and cores?
> For instance, I want to run a job with 16 cores and 300 GB memory for
> about 1 hour. Do you have the SaaS solution for this? I can pay as I did.
>
> Thanks
>
> On Wed, Feb 23, 2022 at 12:21 PM bo yang  wrote:
>
>> It is not a standalone spark cluster. In some details, it deploys a Spark
>> Operator (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator)
>> and an extra REST Service. When people submit Spark application to that
>> REST Service, the REST Service will create a CRD inside the
>> Kubernetes cluster. Then Spark Operator will pick up the CRD and launch the
>> Spark application. The one click tool intends to hide these details, so
>> people could just submit Spark and do not need to deal with too many
>> deployment details.
>>
>> On Tue, Feb 22, 2022 at 8:09 PM Bitfox  wrote:
>>
>>> Can it be a cluster installation of spark? or just the standalone node?
>>>
>>> Thanks
>>>
>>> On Wed, Feb 23, 2022 at 12:06 PM bo yang  wrote:
>>>
>>>> Hi Spark Community,
>>>>
>>>> We built an open source tool to deploy and run Spark on Kubernetes with
>>>> a one click command. For example, on AWS, it could automatically create an
>>>> EKS cluster, node group, NGINX ingress, and Spark Operator. Then you will
>>>> be able to use curl or a CLI tool to submit Spark application. After the
>>>> deployment, you could also install Uber Remote Shuffle Service to enable
>>>> Dynamic Allocation on Kuberentes.
>>>>
>>>> Anyone interested in using or working together on such a tool?
>>>>
>>>> Thanks,
>>>> Bo
>>>>
>>>>


Re: One click to run Spark on Kubernetes

2022-02-22 Thread bo yang
It is not a standalone spark cluster. In some details, it deploys a Spark
Operator (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) and
an extra REST Service. When people submit Spark application to that REST
Service, the REST Service will create a CRD inside the Kubernetes cluster.
Then Spark Operator will pick up the CRD and launch the Spark application.
The one click tool intends to hide these details, so people could just
submit Spark and do not need to deal with too many deployment details.

On Tue, Feb 22, 2022 at 8:09 PM Bitfox  wrote:

> Can it be a cluster installation of spark? or just the standalone node?
>
> Thanks
>
> On Wed, Feb 23, 2022 at 12:06 PM bo yang  wrote:
>
>> Hi Spark Community,
>>
>> We built an open source tool to deploy and run Spark on Kubernetes with a
>> one click command. For example, on AWS, it could automatically create an
>> EKS cluster, node group, NGINX ingress, and Spark Operator. Then you will
>> be able to use curl or a CLI tool to submit Spark application. After the
>> deployment, you could also install Uber Remote Shuffle Service to enable
>> Dynamic Allocation on Kuberentes.
>>
>> Anyone interested in using or working together on such a tool?
>>
>> Thanks,
>> Bo
>>
>>


One click to run Spark on Kubernetes

2022-02-22 Thread bo yang
Hi Spark Community,

We built an open source tool to deploy and run Spark on Kubernetes with a
one click command. For example, on AWS, it could automatically create an
EKS cluster, node group, NGINX ingress, and Spark Operator. Then you will
be able to use curl or a CLI tool to submit Spark application. After the
deployment, you could also install Uber Remote Shuffle Service to enable
Dynamic Allocation on Kuberentes.

Anyone interested in using or working together on such a tool?

Thanks,
Bo


Re: [ANNOUNCE] Apache Kyuubi (Incubating) released 1.4.1-incubating

2022-01-30 Thread Vino Yang
Hi,

As you can found the description from the website[1] of Apache Kyuubi
(incubating):

"Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines."

[1]: https://kyuubi.apache.org/

Best,
Vino

Bitfox  于2022年1月31日周一 14:49写道:
>
> What’s the difference between Spark and Kyuubi?
>
> Thanks
>
> On Mon, Jan 31, 2022 at 2:45 PM Vino Yang  wrote:
>>
>> Hi all,
>>
>> The Apache Kyuubi (Incubating) community is pleased to announce that
>> Apache Kyuubi (Incubating) 1.4.1-incubating has been released!
>>
>> Apache Kyuubi (Incubating) is a distributed multi-tenant JDBC server for
>> large-scale data processing and analytics, built on top of Apache Spark
>> and designed to support more engines (i.e. Apache Flink).
>>
>> Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
>> for end-users to manipulate large-scale data with pre-programmed and
>> extensible Spark SQL engines.
>>
>> We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
>> and data lakes.
>>
>> This "out-of-the-box" model minimizes the barriers and costs for end-users
>> to use Spark at the client side.
>>
>> At the server-side, Kyuubi server and engine's multi-tenant architecture
>> provides the administrators a way to achieve computing resource isolation,
>> data security, high availability, high client concurrency, etc.
>>
>> The full release notes and download links are available at:
>> Release Notes: https://kyuubi.apache.org/release/1.4.1-incubating.html
>>
>> To learn more about Apache Kyuubi (Incubating), please see
>> https://kyuubi.apache.org/
>>
>> Kyuubi Resources:
>> - Issue: https://github.com/apache/incubator-kyuubi/issues
>> - Mailing list: d...@kyuubi.apache.org
>>
>> We would like to thank all contributors of the Kyuubi community and 
>> Incubating
>> community who made this release possible!
>>
>> Thanks,
>> On behalf of Apache Kyuubi (Incubating) community

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ANNOUNCE] Apache Kyuubi (Incubating) released 1.4.1-incubating

2022-01-30 Thread Vino Yang
Hi all,

The Apache Kyuubi (Incubating) community is pleased to announce that
Apache Kyuubi (Incubating) 1.4.1-incubating has been released!

Apache Kyuubi (Incubating) is a distributed multi-tenant JDBC server for
large-scale data processing and analytics, built on top of Apache Spark
and designed to support more engines (i.e. Apache Flink).

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and data lakes.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark at the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.4.1-incubating.html

To learn more about Apache Kyuubi (Incubating), please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/incubator-kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community and
Incubating
community who made this release possible!

Thanks,
On behalf of Apache Kyuubi (Incubating) community


Re: Log4J 2 Support

2021-11-10 Thread Yang,Jie(INF)
It may be more feasible to replace the current slf4j + log4j with log4j2-api,  
some projects that spark relies on may also use log4j at the code level, such 
as EventCounter and ContainerLogAppender in Hadoop, directly removing the 
dependency on log4j may lead to some code dependencies loss.



发件人: Stephen Coy 
日期: 2021年11月10日 星期三 07:16
收件人: Sean Owen 
抄送: User , Ajay Kumar 
主题: Re: Log4J 2 Support

Hi Sean,

I have had a more detailed look at what Spark is doing with log4 APIs and at 
this point I suspect that a logj 2.x migration might be more appropriate at the 
code level.

That still does not solve the libraries issue though. That would need more 
investigation.

I could be tempted to tackle it if there is enough interest.

Cheers,

Steve C


On 10 Nov 2021, at 9:42 am, Sean Owen 
mailto:sro...@gmail.com>> wrote:

Yep that's what I tried, roughly - there is an old jira about it. The issue is 
that Spark does need to configure some concrete logging framework in a few 
cases, as do other libs, and that isn't what the shims cover. Could be possible 
now or with more cleverness but the simple thing didn't work out IIRC.
On Tue, Nov 9, 2021, 4:32 PM Stephen Coy 
mailto:s...@infomedia.com.au>> wrote:
Hi there,

It’s true that the preponderance of log4j 1.2.x in many existing live projects 
is kind of a pain in the butt.

But there is a solution.

1. Migrate all Spark code to use slf4j APIs;

2. Exclude log4j 1.2.x from any dependencies sucking it in;

3. Include the log4j-over-slf4j bridge jar and slf4j-api jars;

4. Choose your favourite modern logging implementation and add it as a 
“runtime" dependency together with it’s slf4j binding jar (if needed).

In fact in the short term you can replace steps 1 and 2 with "remove the log4j 
1.2.17 jar from the distribution" and it should still work.

The slf4j project also includes a commons-logging shim for capturing its output 
too.

FWIW, the slf4j project is run by one of the original log4j developers.

Cheers,

Steve C



On 9 Nov 2021, at 11:11 pm, Sean Owen 
mailto:sro...@gmail.com>> wrote:

No plans that I know of. It's not that Spark uses it so much as its 
dependencies. I tried and failed to upgrade it a couple years ago. you are 
welcome to try, and open a PR if successful.

On Tue, Nov 9, 2021 at 6:09 AM Ajay Kumar 
mailto:ajay.praja...@gmail.com>> wrote:
Hi Team,
We wanted to send Spark executor logs to a centralized logging server using TCP 
Socket. I see that the spark log4j version is very old(1.2.17) and it does not 
support JSON logs over tcp sockets on containers.
I wanted to konw what is the plan for upgrading the log4j version to log4j2.
Thanks in advance.
Regards,
Ajay

This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/



Re: Unsubscribe

2021-08-03 Thread Howard Yang
Unsubscribe

Edward Wu  于2021年8月3日周二 下午4:15写道:

> Unsubscribe
>


Re: Unsubscribe

2021-07-13 Thread Howard Yang
Unsubscribe

Eric Wang  于2021年7月12日周一 上午7:31写道:

> Unsubscribe
>
> On Sun, Jul 11, 2021 at 9:59 PM Rishi Raj Tandon <
> tandon.rishi...@gmail.com> wrote:
>
>> Unsubscribe
>>
>


RE: Spark UI Storage Memory

2020-12-04 Thread Jack Yang
unsubsribe


Re: How to submit a job via REST API?

2020-11-25 Thread Zhou Yang
Hi all,

I found the solution through the source code. Appending the —conf k-v into 
`sparkProperties` work.
For example:

./spark-submit \
—conf foo=bar \
xxx

equals to

{
“xxx” : “yyy”,
“sparkProperties” : {
“foo": "bar"
}
}

Thanks for your reply.

2020年11月25日 下午3:55,vaquar khan 
mailto:vaquar.k...@gmail.com>> 写道:

Hi Yang,

Please find following link

https://stackoverflow.com/questions/63677736/spark-application-as-a-rest-service/63678337#63678337

Regards,
Vaquar khan

On Wed, Nov 25, 2020 at 12:40 AM Sonal Goyal 
mailto:sonalgoy...@gmail.com>> wrote:
You should be able to supply the --conf and its values as part of appArgs 
argument

Cheers,
Sonal
Nube Technologies<http://www.nubetech.co/>
Join me at
Data Con LA Oct 23 | Big Data Conference Europe. Nov 24 | GIDS AI/ML Dec 3




On Tue, Nov 24, 2020 at 11:31 AM Dennis Suhari 
mailto:d.suh...@icloud.com.invalid>> wrote:
Hi Yang,

I am using Livy Server for submitting jobs.

Br,

Dennis



Von meinem iPhone gesendet

Am 24.11.2020 um 03:34 schrieb Zhou Yang 
mailto:zhouyang...@outlook.com>>:


Dear experts,

I found a convenient way to submit job via Rest API at 
https://gist.github.com/arturmkrtchyan/5d8559b2911ac951d34a#file-submit_job-sh.
But I did not know whether can I append `—conf` parameter like what I did in 
spark-submit. Can someone can help me with this issue?

Regards, Yang



--
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago



How to submit a job via REST API?

2020-11-23 Thread Zhou Yang
Dear experts,

I found a convenient way to submit job via Rest API at 
https://gist.github.com/arturmkrtchyan/5d8559b2911ac951d34a#file-submit_job-sh.
But I did not know whether can I append `—conf` parameter like what I did in 
spark-submit. Can someone can help me with this issue?

Regards, Yang



Anyone interested in Remote Shuffle Service

2020-10-21 Thread bo yang
Hi Spark Users,

Uber open sourced Remote Shuffle Service (
https://github.com/uber/RemoteShuffleService ) recently. It works with open
source Spark version without code change needed, and could store shuffle
data on separate machines other than Spark executors.

Anyone interested to try? Also we are still actively developing. Please let
us know if you are interested to work together on it as well :)

Best,
Bo


unsubscribe

2019-06-24 Thread Song Yang



How does org.apache.spark.sql.catalyst.util.MapData support hash lookup?

2019-05-08 Thread Shawn Yang
Hi guys,
I'm reading spark source code. When I read
org.apache.spark.sql.catalyst.util.ArrayBasedMapData,
org.apache.spark.sql.catalyst.expressions.UnsafeMapData, I can't understand
how it supports hash lookup? Is there anything I miss?


Re: unsubscribe

2019-04-27 Thread Song Yang
>
> unsubscribe
>


Re: Spark Profiler

2019-03-28 Thread bo yang
Yeah, these options are very valuable. Just add another option :) We build
a jvm profiler (https://github.com/uber-common/jvm-profiler) to monitor and
profile Spark applications in large scale (e.g. sending metrics to kafka /
hive for batch analysis). People could try it as well.


On Wed, Mar 27, 2019 at 1:49 PM Jack Kolokasis 
wrote:

> Thanks for your reply.  Your help is very valuable and all these links are
> helpful (especially your example)
>
> Best Regards
>
> --Iacovos
> On 3/27/19 10:42 PM, Luca Canali wrote:
>
> I find that the Spark metrics system is quite useful to gather resource
> utilization metrics of Spark applications, including CPU, memory and I/O.
>
> If you are interested an example how this works for us at:
> https://db-blog.web.cern.ch/blog/luca-canali/2019-02-performance-dashboard-apache-spark
> If instead you are rather looking at ways to instrument your Spark code
> with performance metrics, Spark task metrics and event listeners are quite
> useful for that. See also
> https://github.com/apache/spark/blob/master/docs/monitoring.md and
> https://github.com/LucaCanali/sparkMeasure
>
>
>
> Regards,
>
> Luca
>
>
>
> *From:* manish ranjan  
> *Sent:* Tuesday, March 26, 2019 15:24
> *To:* Jack Kolokasis  
> *Cc:* user  
> *Subject:* Re: Spark Profiler
>
>
>
> I have found ganglia very helpful in understanding network I/o , CPU and
> memory usage  for a given spark cluster.
>
> I have not used , but have heard good things about Dr Elephant ( which I
> think was contributed by LinkedIn but not 100%sure).
>
>
>
> On Tue, Mar 26, 2019, 5:59 AM Jack Kolokasis 
> wrote:
>
> Hello all,
>
>  I am looking for a spark profiler to trace my application to find
> the bottlenecks. I need to trace CPU usage, Memory Usage and I/O usage.
>
> I am looking forward for your reply.
>
> --Iacovos
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Using Apache Kylin as data source for Spark

2018-05-25 Thread Li Yang
That is very useful~~  :-)

On Fri, May 18, 2018 at 11:56 AM, ShaoFeng Shi 
wrote:

> Hello, Kylin and Spark users,
>
> A doc is newly added in Apache Kylin website on how to using Kylin as a
> data source in Spark;
> This can help the users who want to use Spark to analysis the aggregated
> Cube data.
>
> https://kylin.apache.org/docs23/tutorial/spark.html
>
> Thanks for your attention.
>
> --
> Best regards,
>
> Shaofeng Shi 史少锋
>


how to kill application

2018-03-26 Thread Shuxin Yang

Hi,

   I apologize if this question was asked before. I try to find the 
answer, but in vain.


   I'm running PySpark on Google Cloud Platform with Spark 2.2.0 and 
YARN resource manager.


   Let S1 be the set of application-ids collected via 'curl 
'http://127.0.0.1:18080/api/v1/applications?status=running'; and S2 be 
the application ids collected via 'yarn application -list'.


   Sometimes I found S1 != S2, how could this this take place?

   For those in the difference of S2 - S1 (i.e. alive YARN app, dead 
Spark app), I can kill them using command 'yarn application -kill id'.


   How can I kill those application in S1 - S2 (i.e. alive Spark app, 
dead YARN app)? Looking not closing the SparkContext could cause this 
problem. However, I'm not always able to close the context, for example 
my program crash prematurely.


   Tons thanks in advance!

Shuxin Yang



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: HDP 2.5 - Python - Spark-On-Hbase

2017-06-26 Thread Weiqing Yang
For SHC documentation, please refer the README in SHC github, which is kept
up-to-date.

On Mon, Jun 26, 2017 at 5:46 AM, ayan guha <guha.a...@gmail.com> wrote:

> Thanks all, I have found correct version of the package. Probably HDP
> documentation is little behind.
>
> Best
> Ayan
>
> On Mon, 26 Jun 2017 at 2:16 pm, Mahesh Sawaiker <
> mahesh_sawai...@persistent.com> wrote:
>
>> Ayan,
>>
>> The location of the logging class was moved from Spark 1.6 to Spark 2.0.
>>
>> Looks like you are trying to run 1.6 code on 2.0, I have ported some code
>> like this before and if you have access to the code you can recompile it by
>> changing reference to Logging class and directly use the slf4 Logger class,
>> most of the code tends to be easily portable.
>>
>>
>>
>> Following is the release note for Spark 2.0
>>
>>
>>
>> *Removals, Behavior Changes and Deprecations*
>>
>> *Removals*
>>
>> The following features have been removed in Spark 2.0:
>>
>>- Bagel
>>- Support for Hadoop 2.1 and earlier
>>- The ability to configure closure serializer
>>- HTTPBroadcast
>>- TTL-based metadata cleaning
>>- *Semi-private class org.apache.spark.Logging. We suggest you use
>>    slf4j directly.*
>>- SparkContext.metricsSystem
>>
>> Thanks,
>>
>> Mahesh
>>
>>
>>
>>
>>
>> *From:* ayan guha [mailto:guha.a...@gmail.com]
>> *Sent:* Monday, June 26, 2017 6:26 AM
>> *To:* Weiqing Yang
>> *Cc:* user
>> *Subject:* Re: HDP 2.5 - Python - Spark-On-Hbase
>>
>>
>>
>> Hi
>>
>>
>>
>> I am using following:
>>
>>
>>
>> --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories
>> http://repo.hortonworks.com/content/groups/public/
>>
>>
>>
>> Is it compatible with Spark 2.X? I would like to use it
>>
>>
>>
>> Best
>>
>> Ayan
>>
>>
>>
>> On Sat, Jun 24, 2017 at 2:09 AM, Weiqing Yang <yangweiqing...@gmail.com>
>> wrote:
>>
>> Yes.
>>
>> What SHC version you were using?
>>
>> If hitting any issues, you can post them in SHC github issues. There are
>> some threads about this.
>>
>>
>>
>> On Fri, Jun 23, 2017 at 5:46 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>> Hi
>>
>>
>>
>> Is it possible to use SHC from Hortonworks with pyspark? If so, any
>> working code sample available?
>>
>>
>>
>> Also, I faced an issue while running the samples with Spark 2.0
>>
>>
>>
>> "Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging"
>>
>>
>>
>> Any workaround?
>>
>>
>>
>> Thanks in advance
>>
>>
>>
>> --
>>
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best Regards,
>> Ayan Guha
>> DISCLAIMER
>> ==
>> This e-mail may contain privileged and confidential information which is
>> the property of Persistent Systems Ltd. It is intended only for the use of
>> the individual or entity to which it is addressed. If you are not the
>> intended recipient, you are not authorized to read, retain, copy, print,
>> distribute or use this message. If you have received this communication in
>> error, please notify the sender and delete all copies of this message.
>> Persistent Systems Ltd. does not accept any liability for virus infected
>> mails.
>>
> --
> Best Regards,
> Ayan Guha
>


Meetup in Taiwan

2017-06-25 Thread Yang Bryan
Hi,

I'm Bryan, the co-founder of Taiwan Spark User Group.
We discuss, share information on https://www.facebook.com/groups/spark.tw/.
We have physical meetup twice a month.
Please help us add on the official website.

And We will hold a code competition about Spark, could we print the logo of
Spark on the certificate of participation?

If you have any questions or suggestions, please feel free and let me know.
Thank you.

Best Regards,
Bryan


Re: HDP 2.5 - Python - Spark-On-Hbase

2017-06-23 Thread Weiqing Yang
Yes.
What SHC version you were using?
If hitting any issues, you can post them in SHC github issues. There are
some threads about this.

On Fri, Jun 23, 2017 at 5:46 AM, ayan guha  wrote:

> Hi
>
> Is it possible to use SHC from Hortonworks with pyspark? If so, any
> working code sample available?
>
> Also, I faced an issue while running the samples with Spark 2.0
>
> "Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging"
>
> Any workaround?
>
> Thanks in advance
>
> --
> Best Regards,
> Ayan Guha
>


Re: Use SQL Script to Write Spark SQL Jobs

2017-06-14 Thread bo yang
Hi Nihed,

Interesting to see envelope. The idea is same there! Thanks for the sharing
:)

Best,
Bo


On Wed, Jun 14, 2017 at 12:22 AM, nihed mbarek <nihe...@gmail.com> wrote:

> Hi
>
> I already saw a project with the same idea.
> https://github.com/cloudera-labs/envelope
>
> Regards,
>
> On Wed, 14 Jun 2017 at 04:32, bo yang <bobyan...@gmail.com> wrote:
>
>> Thanks Benjamin and Ayan for the feedback! You kind of represent two
>> group of people who need such script tool or not. Personally I find the
>> script is very useful for myself to write ETL pipelines and daily jobs.
>> Let's see whether there are other people interested in such project.
>>
>> Best,
>> Bo
>>
>>
>>
>>
>>
>> On Mon, Jun 12, 2017 at 11:26 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> IMHO, this approach is not very useful.
>>>
>>> Firstly, 2 use cases mentioned in the project page:
>>>
>>> 1. Simplify spark development - I think the only thing can be done there
>>> is to come up with some boilerplate function, which essentially will take a
>>> sql and come back with a temp table name and a corresponding DF (Remember
>>> the project targets structured data sources only, not streaming or RDD).
>>> Building another mini-DSL on top of already fairly elaborate spark API
>>> never appealed to me.
>>>
>>> 2. Business Analysts using Spark - single word answer is Notebooks. Take
>>> your pick - Jupyter, Zeppelin, Hue.
>>>
>>> The case of "Spark is for Developers", IMHO, stemmed to the
>>> packaging/building overhead of spark apps. For Python users, this barrier
>>> is considerably lower (And maybe that is why I do not see a prominent
>>> need).
>>>
>>> But I can imagine the pain of a SQL developer coming into a scala/java
>>> world. I came from a hardcore SQL/DWH environment where I used to write SQL
>>> and SQL only. So SBT or MVN are still not my friend. Maybe someday they
>>> will. But learned them hard way, just because the value of using spark can
>>> offset the pain long long way. So, I think there is a need of spending time
>>> with the environment to get comfortable with it. And maybe, just maybe,
>>> using Nifi in case you miss drag/drop features too much :)
>>>
>>> But, these are my 2c, and sincerely humble opinion, and I wish you all
>>> the luck for your project.
>>>
>>> On Tue, Jun 13, 2017 at 3:23 PM, Benjamin Kim <bbuil...@gmail.com>
>>> wrote:
>>>
>>>> Hi Bo,
>>>>
>>>> +1 for your project. I come from the world of data warehouses, ETL, and
>>>> reporting analytics. There are many individuals who do not know or want to
>>>> do any coding. They are content with ANSI SQL and stick to it. ETL
>>>> workflows are also done without any coding using a drag-and-drop user
>>>> interface, such as Talend, SSIS, etc. There is a small amount of scripting
>>>> involved but not too much. I looked at what you are trying to do, and I
>>>> welcome it. This could open up Spark to the masses and shorten development
>>>> times.
>>>>
>>>> Cheers,
>>>> Ben
>>>>
>>>>
>>>> On Jun 12, 2017, at 10:14 PM, bo yang <bobyan...@gmail.com> wrote:
>>>>
>>>> Hi Aakash,
>>>>
>>>> Thanks for your willing to help :) It will be great if I could get more
>>>> feedback on my project. For example, is there any other people feeling the
>>>> need of using a script to write Spark job easily? Also, I would explore
>>>> whether it is possible that the Spark project takes some work to build such
>>>> a script based high level DSL.
>>>>
>>>> Best,
>>>> Bo
>>>>
>>>>
>>>> On Mon, Jun 12, 2017 at 12:14 PM, Aakash Basu <
>>>> aakash.spark@gmail.com> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> I work on Spark SQL and would pretty much be able to help you in this.
>>>>> Let me know your requirement.
>>>>>
>>>>> Thanks,
>>>>> Aakash.
>>>>>
>>>>> On 12-Jun-2017 11:00 AM, "bo yang" <bobyan...@gmail.com> wrote:
>>>>>
>>>>>> Hi Guys,
>>>>>>
>>>>>> I am writing a small open source project
>>>>>> <https://github.com/uber/uberscriptquery> to use SQL Script to write
>>>>>> Spark Jobs. Want to see if there are other people interested to use or
>>>>>> contribute to this project.
>>>>>>
>>>>>> The project is called UberScriptQuery (https://github.com/uber/
>>>>>> uberscriptquery). Sorry for the dumb name to avoid conflict with
>>>>>> many other names (Spark is registered trademark, thus I could not use 
>>>>>> Spark
>>>>>> in my project name).
>>>>>>
>>>>>> In short, it is a high level SQL-like DSL (Domain Specific Language)
>>>>>> on top of Spark. People can use that DSL to write Spark jobs without
>>>>>> worrying about Spark internal details. Please check README
>>>>>> <https://github.com/uber/uberscriptquery> in the project to get more
>>>>>> details.
>>>>>>
>>>>>> It will be great if I could get any feedback or suggestions!
>>>>>>
>>>>>> Best,
>>>>>> Bo
>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> <http://tn.linkedin.com/in/nihed>
>
>


Re: Use SQL Script to Write Spark SQL Jobs

2017-06-13 Thread bo yang
Thanks Benjamin and Ayan for the feedback! You kind of represent two group
of people who need such script tool or not. Personally I find the script is
very useful for myself to write ETL pipelines and daily jobs. Let's see
whether there are other people interested in such project.

Best,
Bo





On Mon, Jun 12, 2017 at 11:26 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> IMHO, this approach is not very useful.
>
> Firstly, 2 use cases mentioned in the project page:
>
> 1. Simplify spark development - I think the only thing can be done there
> is to come up with some boilerplate function, which essentially will take a
> sql and come back with a temp table name and a corresponding DF (Remember
> the project targets structured data sources only, not streaming or RDD).
> Building another mini-DSL on top of already fairly elaborate spark API
> never appealed to me.
>
> 2. Business Analysts using Spark - single word answer is Notebooks. Take
> your pick - Jupyter, Zeppelin, Hue.
>
> The case of "Spark is for Developers", IMHO, stemmed to the
> packaging/building overhead of spark apps. For Python users, this barrier
> is considerably lower (And maybe that is why I do not see a prominent
> need).
>
> But I can imagine the pain of a SQL developer coming into a scala/java
> world. I came from a hardcore SQL/DWH environment where I used to write SQL
> and SQL only. So SBT or MVN are still not my friend. Maybe someday they
> will. But learned them hard way, just because the value of using spark can
> offset the pain long long way. So, I think there is a need of spending time
> with the environment to get comfortable with it. And maybe, just maybe,
> using Nifi in case you miss drag/drop features too much :)
>
> But, these are my 2c, and sincerely humble opinion, and I wish you all the
> luck for your project.
>
> On Tue, Jun 13, 2017 at 3:23 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
>
>> Hi Bo,
>>
>> +1 for your project. I come from the world of data warehouses, ETL, and
>> reporting analytics. There are many individuals who do not know or want to
>> do any coding. They are content with ANSI SQL and stick to it. ETL
>> workflows are also done without any coding using a drag-and-drop user
>> interface, such as Talend, SSIS, etc. There is a small amount of scripting
>> involved but not too much. I looked at what you are trying to do, and I
>> welcome it. This could open up Spark to the masses and shorten development
>> times.
>>
>> Cheers,
>> Ben
>>
>>
>> On Jun 12, 2017, at 10:14 PM, bo yang <bobyan...@gmail.com> wrote:
>>
>> Hi Aakash,
>>
>> Thanks for your willing to help :) It will be great if I could get more
>> feedback on my project. For example, is there any other people feeling the
>> need of using a script to write Spark job easily? Also, I would explore
>> whether it is possible that the Spark project takes some work to build such
>> a script based high level DSL.
>>
>> Best,
>> Bo
>>
>>
>> On Mon, Jun 12, 2017 at 12:14 PM, Aakash Basu <aakash.spark@gmail.com
>> > wrote:
>>
>>> Hey,
>>>
>>> I work on Spark SQL and would pretty much be able to help you in this.
>>> Let me know your requirement.
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On 12-Jun-2017 11:00 AM, "bo yang" <bobyan...@gmail.com> wrote:
>>>
>>>> Hi Guys,
>>>>
>>>> I am writing a small open source project
>>>> <https://github.com/uber/uberscriptquery> to use SQL Script to write
>>>> Spark Jobs. Want to see if there are other people interested to use or
>>>> contribute to this project.
>>>>
>>>> The project is called UberScriptQuery (https://githu
>>>> b.com/uber/uberscriptquery). Sorry for the dumb name to avoid conflict
>>>> with many other names (Spark is registered trademark, thus I could not use
>>>> Spark in my project name).
>>>>
>>>> In short, it is a high level SQL-like DSL (Domain Specific Language) on
>>>> top of Spark. People can use that DSL to write Spark jobs without worrying
>>>> about Spark internal details. Please check README
>>>> <https://github.com/uber/uberscriptquery> in the project to get more
>>>> details.
>>>>
>>>> It will be great if I could get any feedback or suggestions!
>>>>
>>>> Best,
>>>> Bo
>>>>
>>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Use SQL Script to Write Spark SQL Jobs

2017-06-12 Thread bo yang
Hi Aakash,

Thanks for your willing to help :) It will be great if I could get more
feedback on my project. For example, is there any other people feeling the
need of using a script to write Spark job easily? Also, I would explore
whether it is possible that the Spark project takes some work to build such
a script based high level DSL.

Best,
Bo


On Mon, Jun 12, 2017 at 12:14 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hey,
>
> I work on Spark SQL and would pretty much be able to help you in this. Let
> me know your requirement.
>
> Thanks,
> Aakash.
>
> On 12-Jun-2017 11:00 AM, "bo yang" <bobyan...@gmail.com> wrote:
>
>> Hi Guys,
>>
>> I am writing a small open source project
>> <https://github.com/uber/uberscriptquery> to use SQL Script to write
>> Spark Jobs. Want to see if there are other people interested to use or
>> contribute to this project.
>>
>> The project is called UberScriptQuery (https://githu
>> b.com/uber/uberscriptquery). Sorry for the dumb name to avoid conflict
>> with many other names (Spark is registered trademark, thus I could not use
>> Spark in my project name).
>>
>> In short, it is a high level SQL-like DSL (Domain Specific Language) on
>> top of Spark. People can use that DSL to write Spark jobs without worrying
>> about Spark internal details. Please check README
>> <https://github.com/uber/uberscriptquery> in the project to get more
>> details.
>>
>> It will be great if I could get any feedback or suggestions!
>>
>> Best,
>> Bo
>>
>>


Use SQL Script to Write Spark SQL Jobs

2017-06-11 Thread bo yang
Hi Guys,

I am writing a small open source project
 to use SQL Script to write Spark
Jobs. Want to see if there are other people interested to use or contribute
to this project.

The project is called UberScriptQuery (
https://github.com/uber/uberscriptquery). Sorry for the dumb name to avoid
conflict with many other names (Spark is registered trademark, thus I could
not use Spark in my project name).

In short, it is a high level SQL-like DSL (Domain Specific Language) on top
of Spark. People can use that DSL to write Spark jobs without worrying
about Spark internal details. Please check README
 in the project to get more
details.

It will be great if I could get any feedback or suggestions!

Best,
Bo


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
ah.. thanks , your code also works for me, I figured it's because I tried
to encode a tuple of (MyClass, Int):


package org.apache.spark

/**
  */

import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoders, SQLContext}


object Hello {
  // this class has to be OUTSIDE the method that calls it!! otherwise
gives error about typetag not found
  // the UDT stuff from
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
  // and 
http://stackoverflow.com/questions/32440461/how-to-define-schema-for-custom-type-in-spark-sql
  class Person4 {
@scala.beans.BeanProperty def setX(x:Int): Unit = {}
@scala.beans.BeanProperty def getX():Int = {1}
  }

  def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file
on your system
val conf = new
SparkConf().setMaster("local[*]").setAppName("Simple Application")
val sc = new SparkContext(conf)

val raw = Array((new Person4(), 1), (new Person4(), 1))
val myrdd = sc.parallelize(raw)

val sqlContext = new SQLContext(sc)

implicit val personEncoder = Encoders.bean[Person4](classOf[Person4])
implicit val personEncoder2 = Encoders.tuple(personEncoder, Encoders.INT)


import sqlContext.implicits._
  this works --
Seq(new Person4(), new Person4()).toDS()

 -- this doesn't -
Seq((new Person4(),1), (new Person4(),1)).toDS()


sc.stop()
  }
}


On Tue, May 9, 2017 at 1:37 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Must be a bug.  This works for me
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/908554720841389/2840265927289860/latest.html>
>  in
> Spark 2.1.
>
> On Tue, May 9, 2017 at 12:10 PM, Yang <tedd...@gmail.com> wrote:
>
>> somehow the schema check is here
>>
>> https://github.com/apache/spark/blob/master/sql/catalyst/
>> src/main/scala/org/apache/spark/sql/catalyst/ScalaReflec
>> tion.scala#L697-L750
>>
>> supposedly beans are to be handled, but it's not clear to me which line
>> handles the type of beans. if that's clear, I could probably annotate my
>> bean class properly
>>
>> On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> I think you are supposed to set BeanProperty on a var as they do here
>>> <https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala#L71-L83>.
>>> If you are using scala though I'd consider using the case class encoders.
>>>
>>> On Tue, May 9, 2017 at 12:21 AM, Yang <tedd...@gmail.com> wrote:
>>>
>>>> I'm trying to use Encoders.bean() to create an encoder for my custom
>>>> class, but it fails complaining about can't find the schema:
>>>>
>>>>
>>>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>>>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>>>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>>>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd
>>>> : org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1]
>>>> at parallelize at :31 scala> sqlcontext.createDataFrame(per
>>>> son_rdd) java.lang.UnsupportedOperationException: Schema for type
>>>> Person4 is not supported at org.apache.spark.sql.catalyst.
>>>> ScalaReflection$.schemaFor(ScalaReflection.scala:716) at org.apache.
>>>> spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
>>>> ScalaReflection.scala:71 2) at org.apache.spark.sql.catalyst.
>>>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 1)
>>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
>>>> sableLike.scala:234) at
>>>>
>>>>
>>>> but if u look at the encoder's schema, it does know it:
>>>> but the system does seem to understand the schema for "Person4":
>>>>
>>>>
>>>> scala> personEncoder.schema
>>>> res38: org.apache.spark.sql.types.StructType = 
>>>> StructType(StructField(x,IntegerType,false))
>>>>
>>>>
>>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
somehow the schema check is here

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L697-L750

supposedly beans are to be handled, but it's not clear to me which line
handles the type of beans. if that's clear, I could probably annotate my
bean class properly

On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> I think you are supposed to set BeanProperty on a var as they do here
> <https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala#L71-L83>.
> If you are using scala though I'd consider using the case class encoders.
>
> On Tue, May 9, 2017 at 12:21 AM, Yang <tedd...@gmail.com> wrote:
>
>> I'm trying to use Encoders.bean() to create an encoder for my custom
>> class, but it fails complaining about can't find the schema:
>>
>>
>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>> parallelize at :31 scala> sqlcontext.createDataFrame(person_rdd
>> ) java.lang.UnsupportedOperationException: Schema for type Person4 is not
>> supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:716) at org.apache.spark.sql.catalyst.
>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.
>> apply(ScalaReflection.scala:71 1) at scala.collection.TraversableLi
>> ke$$anonfun$map$1.apply(TraversableLike.scala:234) at
>>
>>
>> but if u look at the encoder's schema, it does know it:
>> but the system does seem to understand the schema for "Person4":
>>
>>
>> scala> personEncoder.schema
>> res38: org.apache.spark.sql.types.StructType = 
>> StructType(StructField(x,IntegerType,false))
>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
2.0.2 with scala 2.11

On Tue, May 9, 2017 at 11:30 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Which version of Spark?
>
> On Tue, May 9, 2017 at 11:28 AM, Yang <tedd...@gmail.com> wrote:
>
>> actually with var it's the same:
>>
>>
>> scala> class Person4 {
>>  |
>>  | @scala.beans.BeanProperty var X:Int = 1
>>  | }
>> defined class Person4
>>
>> scala> val personEncoder = Encoders.bean[Person4](classOf[Person4])
>> personEncoder: org.apache.spark.sql.Encoder[Person4] = class[x[0]: int]
>>
>> scala> val person_rdd =sc.parallelize(Array( (new Person4(), 1), (new
>> Person4(), 2) ))
>> person_rdd: org.apache.spark.rdd.RDD[(Person4, Int)] =
>> ParallelCollectionRDD[3] at parallelize at :39
>>
>> scala> sqlContext.createDataFrame(person_rdd)
>> java.lang.UnsupportedOperationException: Schema for type Person4 is not
>> supported
>>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:716)
>>   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schem
>> aFor$2.apply(ScalaReflection.scala:712)
>>   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schem
>> aFor$2.apply(ScalaReflection.scala:711)
>>   at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:234)
>>   at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:234)
>>   at scala.collection.immutable.List.foreach(List.scala:381)
>>   at scala.collection.TraversableLike$class.map(TraversableLike.
>> scala:234)
>>   at scala.collection.immutable.List.map(List.scala:285)
>>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:711)
>>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:654)
>>   at org.apache.spark.sql.SparkSession.createDataFrame(SparkSessi
>> on.scala:251)
>>   at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.
>> scala:278)
>>   ... 54 elided
>>
>> On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> I think you are supposed to set BeanProperty on a var as they do here
>>> <https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala#L71-L83>.
>>> If you are using scala though I'd consider using the case class encoders.
>>>
>>> On Tue, May 9, 2017 at 12:21 AM, Yang <tedd...@gmail.com> wrote:
>>>
>>>> I'm trying to use Encoders.bean() to create an encoder for my custom
>>>> class, but it fails complaining about can't find the schema:
>>>>
>>>>
>>>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>>>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>>>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>>>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd
>>>> : org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1]
>>>> at parallelize at :31 scala> sqlcontext.createDataFrame(per
>>>> son_rdd) java.lang.UnsupportedOperationException: Schema for type
>>>> Person4 is not supported at org.apache.spark.sql.catalyst.
>>>> ScalaReflection$.schemaFor(ScalaReflection.scala:716) at org.apache.
>>>> spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
>>>> ScalaReflection.scala:71 2) at org.apache.spark.sql.catalyst.
>>>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 1)
>>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
>>>> sableLike.scala:234) at
>>>>
>>>>
>>>> but if u look at the encoder's schema, it does know it:
>>>> but the system does seem to understand the schema for "Person4":
>>>>
>>>>
>>>> scala> personEncoder.schema
>>>> res38: org.apache.spark.sql.types.StructType = 
>>>> StructType(StructField(x,IntegerType,false))
>>>>
>>>>
>>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
actually with var it's the same:


scala> class Person4 {
 |
 | @scala.beans.BeanProperty var X:Int = 1
 | }
defined class Person4

scala> val personEncoder = Encoders.bean[Person4](classOf[Person4])
personEncoder: org.apache.spark.sql.Encoder[Person4] = class[x[0]: int]

scala> val person_rdd =sc.parallelize(Array( (new Person4(), 1), (new
Person4(), 2) ))
person_rdd: org.apache.spark.rdd.RDD[(Person4, Int)] =
ParallelCollectionRDD[3] at parallelize at :39

scala> sqlContext.createDataFrame(person_rdd)
java.lang.UnsupportedOperationException: Schema for type Person4 is not
supported
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:716)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:712)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:711)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:711)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:654)
  at
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:251)
  at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:278)
  ... 54 elided

On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> I think you are supposed to set BeanProperty on a var as they do here
> <https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala#L71-L83>.
> If you are using scala though I'd consider using the case class encoders.
>
> On Tue, May 9, 2017 at 12:21 AM, Yang <tedd...@gmail.com> wrote:
>
>> I'm trying to use Encoders.bean() to create an encoder for my custom
>> class, but it fails complaining about can't find the schema:
>>
>>
>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>> parallelize at :31 scala> sqlcontext.createDataFrame(person_rdd
>> ) java.lang.UnsupportedOperationException: Schema for type Person4 is not
>> supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:716) at org.apache.spark.sql.catalyst.
>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.
>> apply(ScalaReflection.scala:71 1) at scala.collection.TraversableLi
>> ke$$anonfun$map$1.apply(TraversableLike.scala:234) at
>>
>>
>> but if u look at the encoder's schema, it does know it:
>> but the system does seem to understand the schema for "Person4":
>>
>>
>> scala> personEncoder.schema
>> res38: org.apache.spark.sql.types.StructType = 
>> StructType(StructField(x,IntegerType,false))
>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
Thanks Michael.

I could not use case class here since I need to later modify the output of
getX() so that the output is dynamically generated.

the bigger context is this:
I want to implement topN(), using a BoundedPriorityQueue. basically I
include a queue in reduce(), or aggregateByKey(), but the only available
serializer is kyro, and it's extremely slow in this case because
BoundedPriorityQueue probably has a lot of internal fields.

so I want to wrap the queue in a wrapper class, and only export the queue
content through getContent() and setContent(), and the content is a list of
tuples. This way when I encode the wrapper, the bean encoder simply encodes
the getContent() output, I think. encoding a list of tuples is very fast.

Yang

On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> I think you are supposed to set BeanProperty on a var as they do here
> <https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala#L71-L83>.
> If you are using scala though I'd consider using the case class encoders.
>
> On Tue, May 9, 2017 at 12:21 AM, Yang <tedd...@gmail.com> wrote:
>
>> I'm trying to use Encoders.bean() to create an encoder for my custom
>> class, but it fails complaining about can't find the schema:
>>
>>
>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>> parallelize at :31 scala> sqlcontext.createDataFrame(person_rdd
>> ) java.lang.UnsupportedOperationException: Schema for type Person4 is not
>> supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:716) at org.apache.spark.sql.catalyst.
>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.
>> apply(ScalaReflection.scala:71 1) at scala.collection.TraversableLi
>> ke$$anonfun$map$1.apply(TraversableLike.scala:234) at
>>
>>
>> but if u look at the encoder's schema, it does know it:
>> but the system does seem to understand the schema for "Person4":
>>
>>
>> scala> personEncoder.schema
>> res38: org.apache.spark.sql.types.StructType = 
>> StructType(StructField(x,IntegerType,false))
>>
>>
>


how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
I'm trying to use Encoders.bean() to create an encoder for my custom class,
but it fails complaining about can't find the schema:


class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {} @scala.
beans.BeanProperty def getX():Int = {1} } val personEncoder = Encoders.bean[
Person4](classOf[Person4]) scala> val person_rdd =sc.parallelize(Array( (new
Person4(), 1), (new Person4(), 2) )) person_rdd: org.apache.spark.rdd.RDD[(
Person4, Int)] = ParallelCollectionRDD[1] at parallelize at :31
scala> sqlcontext.createDataFrame(person_rdd) java.lang.
UnsupportedOperationException: Schema for type Person4 is not supported at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.
scala:716) at org.apache.spark.sql.catalyst.
ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2) at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
ScalaReflection.scala:71 1) at scala.collection.
TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at


but if u look at the encoder's schema, it does know it:
but the system does seem to understand the schema for "Person4":


scala> personEncoder.schema
res38: org.apache.spark.sql.types.StructType =
StructType(StructField(x,IntegerType,false))


Re: Graph Analytics on HBase with HGraphDB and Spark GraphFrames

2017-04-03 Thread Weiqing Yang
Thanks for sharing this.


On Sun, Apr 2, 2017 at 7:08 PM, Irving Duran  wrote:

> Thanks for the share!
>
>
> Thank You,
>
> Irving Duran
>
> On Sun, Apr 2, 2017 at 7:19 PM, Felix Cheung 
> wrote:
>
>> Interesting!
>>
>> --
>> *From:* Robert Yokota 
>> *Sent:* Sunday, April 2, 2017 9:40:07 AM
>> *To:* user@spark.apache.org
>> *Subject:* Graph Analytics on HBase with HGraphDB and Spark GraphFrames
>>
>> Hi,
>>
>> In case anyone is interested in analyzing graphs in HBase with Apache
>> Spark GraphFrames, this might be helpful:
>>
>> https://yokota.blog/2017/04/02/graph-analytics-on-hbase-with
>> -hgraphdb-and-spark-graphframes/
>>
>
>


Re: Re: how to call recommend method from ml.recommendation.ALS

2017-03-15 Thread Yuhao Yang
This is something that was just added to ML and will probably be released
with 2.2. For now you can try to copy from the master code:
https://github.com/apache/spark/blob/70f9d7f71c63d2b1fdfed75cb7a59285c272a62b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L352
and give it a try.

Yuhao

2017-03-15 21:39 GMT-07:00 lk_spark :

> thanks for your reply , what I exactly want to know is :
> in package mllib.recommendation  , MatrixFactorizationModel have method
> like recommendProducts , but I didn't find it in package ml.recommendation.
> how can I do the samething as mllib when I use ml.
> 2017-03-16
> --
> lk_spark
> --
>
> *发件人:*任弘迪 
> *发送时间:*2017-03-16 10:46
> *主题:*Re: how to call recommend method from ml.recommendation.ALS
> *收件人:*"lk_spark"
> *抄送:*"user.spark"
>
> if the num of user-item pairs to predict aren't too large, say millions,
> you could transform the target dataframe and save the result to a hive
> table, then build cache based on that table for online services.
>
> if it's not the case(such as billions of user item pairs to predict), you
> have to start a service with the model loaded, send user to the service,
> first match several hundreds of items from all items available which could
> itself be another service or cache, then transform this user and all items
> using the model to get prediction, and return items ordered by prediction.
>
> On Thu, Mar 16, 2017 at 9:32 AM, lk_spark  wrote:
>
>> hi,all:
>>under spark2.0 ,I wonder to know after trained a
>> ml.recommendation.ALSModel how I can do the recommend action?
>>
>>I try to save the model and load it by MatrixFactorizationModel
>> but got error.
>>
>> 2017-03-16
>> --
>> lk_spark
>>
>
>


Re: [MLlib] kmeans random initialization, same seed every time

2017-03-15 Thread Yuhao Yang
Hi Julian,

Thanks for reporting this. This is a valid issue and I created
https://issues.apache.org/jira/browse/SPARK-19957 to track it.

Right now the seed is set to this.getClass.getName.hashCode.toLong by
default, which indeed keeps the same among multiple fits. Feel free to
leave your comments or send a PR for the fix.

For your problem, you may add .setSeed(new Random().nextLong()) before
fit() as a workaround.

Thanks,
Yuhao

2017-03-14 5:46 GMT-07:00 Julian Keppel :

> I'm sorry, I missed some important informations. I use Spark version 2.0.2
> in Scala 2.11.8.
>
> 2017-03-14 13:44 GMT+01:00 Julian Keppel :
>
>> Hi everybody,
>>
>> I make some experiments with the Spark kmeans implementation of the new
>> DataFrame-API. I compare clustering results of different runs with
>> different parameters. I recognized that for random initialization mode, the
>> seed value is the same every time. How is it calculated? In my
>> understanding the seed should be random if it is not provided by the user.
>>
>> Thank you for you help.
>>
>> Julian
>>
>
>


Re: how to construct parameter for model.transform() from datafile

2017-03-14 Thread Yuhao Yang
Hi Jinhong,


Based on the error message, your second collection of vectors has a
dimension of 804202, while the dimension of your training vectors
was 144109. So please make sure your test dataset are of the same dimension
as the training data.

>From the test dataset you posted, the vector dimension is much larger
than 144109
(804202?).

Regards,
Yuhao


2017-03-13 4:59 GMT-07:00 jinhong lu :

> Anyone help?
>
> > 在 2017年3月13日,19:38,jinhong lu  写道:
> >
> > After train the mode, I got the result look like this:
> >
> >
> >   scala>  predictionResult.show()
> >   +-+++---
> -+--+
> >   |label|features|   rawPrediction|
>  probability|prediction|
> >   +-+++---
> -+--+
> >   |  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|
>  0.0|
> >   |  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|
>  0.0|
> >   |  0.0|(144109,[100],[24...|[-146.81612388602...|[9.73704654529197...|
>  1.0|
> >
> > And then, I transform() the data by these code:
> >
> >   import org.apache.spark.ml.linalg.Vectors
> >   import org.apache.spark.ml.linalg.Vector
> >   import scala.collection.mutable
> >
> >  def lineToVector(line:String ):Vector={
> >   val seq = new mutable.Queue[(Int,Double)]
> >   val content = line.split(" ");
> >   for( s <- content){
> > val index = s.split(":")(0).toInt
> > val value = s.split(":")(1).toDouble
> >  seq += ((index,value))
> >   }
> >   return Vectors.sparse(144109, seq)
> > }
> >
> >val df = sc.sequenceFile[org.apache.hadoop.io.LongWritable,
> org.apache.hadoop.io.Text]("/data/gamein/gameall_sdc/wh/
> gameall.db/edt_udid_label_format/ds=20170312/001006_0").map(line=>line._2).map(line
> => 
> (line.toString.split("\t")(0),lineToVector(line.toString.split("\t")(1.toDF("udid",
> "features")
> >val predictionResult = model.transform(df)
> >predictionResult.show()
> >
> >
> > But I got the error look like this:
> >
> > Caused by: java.lang.IllegalArgumentException: requirement failed: You
> may not write an element to index 804201 because the declared size of your
> vector is 144109
> >  at scala.Predef$.require(Predef.scala:224)
> >  at org.apache.spark.ml.linalg.Vectors$.sparse(Vectors.scala:219)
> >  at lineToVector(:55)
> >  at $anonfun$4.apply(:50)
> >  at $anonfun$4.apply(:50)
> >  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(generated.java:84)
> >  at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> >  at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> >  at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:246)
> >  at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> >  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:803)
> >  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:803)
> >
> > So I change
> >
> >   return Vectors.sparse(144109, seq)
> >
> > to
> >
> >   return Vectors.sparse(804202, seq)
> >
> > Another error occurs:
> >
> >   Caused by: java.lang.IllegalArgumentException: requirement
> failed: The columns of A don't match the number of elements of x. A:
> 144109, x: 804202
> > at scala.Predef$.require(Predef.scala:224)
> > at org.apache.spark.ml.linalg.BLAS$.gemv(BLAS.scala:521)
> > at org.apache.spark.ml.linalg.Matrix$class.multiply(
> Matrices.scala:110)
> > at org.apache.spark.ml.linalg.DenseMatrix.multiply(Matrices.
> scala:176)
> >
> > what should I do?
> >> 在 2017年3月13日,16:31,jinhong lu  写道:
> >>
> >> Hi, all:
> >>
> >> I got these training data:
> >>
> >>  0 31607:17
> >>  0 111905:36
> >>  0 109:3 506:41 1509:1 2106:4 5309:1 7209:5 8406:1 27108:1 27709:1
> 30209:8 36109:20 41408:1 42309:1 46509:1 47709:5 57809:1 58009:1 58709:2
> 112109:4 123305:48 142509:1
> >>  0 407:14 2905:2 5209:2 6509:2 6909:2 14509:2 18507:10
> >>  0 604:3 3505:9 6401:3 6503:2 6505:3 7809:8 10509:3 12109:3
> 15207:19 31607:19
> >>  0 19109:7 29705:4 123305:32
> >>  0 15309:1 43005:1 108509:1
> >>  1 604:1 6401:1 6503:1 15207:4 31607:40
> >>  0 1807:19
> >>  0 301:14 501:1 1502:14 2507:12 123305:4
> >>  0 607:14 19109:460 123305:448
> >>  0 5406:14 7209:4 10509:3 19109:6 

Re: FPGrowth Model is taking too long to generate frequent item sets

2017-03-14 Thread Yuhao Yang
Hi Raju,

Have you tried setNumPartitions with a larger number?

2017-03-07 0:30 GMT-08:00 Eli Super :

> Hi
>
> It's area of knowledge , you will need to read online several hours about
> it
>
> What is your programming language ?
>
> Try search online : "machine learning binning %my_programing_langauge%"
> and
> "machine learning feature engineering %my_programing_langauge%"
>
> On Tue, Mar 7, 2017 at 3:39 AM, Raju Bairishetti  wrote:
>
>> @Eli, Thanks for the suggestion. If you do not mind can you please
>> elaborate approaches?
>>
>> On Mon, Mar 6, 2017 at 7:29 PM, Eli Super  wrote:
>>
>>> Hi
>>>
>>> Try to implement binning and/or feature engineering (smart feature
>>> selection for example)
>>>
>>> Good luck
>>>
>>> On Mon, Mar 6, 2017 at 6:56 AM, Raju Bairishetti 
>>> wrote:
>>>
 Hi,
   I am new to Spark ML Lib. I am using FPGrowth model for finding
 related items.

 Number of transactions are 63K and the total number of items in all
 transactions are 200K.

 I am running FPGrowth model to generate frequent items sets. It is
 taking huge amount of time to generate frequent itemsets.* I am
 setting min-support value such that each item appears in at least ~(number
 of items)/(number of transactions).*

 It is taking lots of time in case If I say item can appear at least
 once in the database.

 If I give higher value to min-support then output is very smaller.

 Could anyone please guide me how to reduce the execution time for
 generating frequent items?

 --
 Thanks,
 Raju Bairishetti,
 www.lazada.com

>>>
>>>
>>
>>
>> --
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>


Sharing my DataFrame (DataSet) cheat sheet.

2017-03-04 Thread Yuhao Yang
Sharing some snippets I accumulated during developing with Apache Spark
DataFrame (DataSet). Hope it can help you in some way.

https://github.com/hhbyyh/DataFrameCheatSheet.

[image: 内嵌图片 1]





Regards,
Yuhao Yang


Re: physical memory usage keep increasing for spark app on Yarn

2017-02-15 Thread Yang Cao
Hi Pavel!

Sorry for late. I just do some investigation in these days with my colleague. 
Here is my thought: from spark 1.2, we use Netty with off-heap memory to reduce 
GC during shuffle and cache block transfer. In my case, if I try to increase 
the memory overhead enough. I will get the Max direct buffer exception. When 
Netty do block transferring, there will be five threads by default to grab the 
data chunk to target executor. In my situation, one single chunk is too big to 
fit into the buffer. So gc won’t help here. My final solution is to do another 
repartition before the repartition(1). Just to make 10x times more partitions 
than original’s. In this way, I can reduce the size of each chunk Netty 
transfer. 

Also I want to say that it’s not a good choice to repartition a big dataset 
into single file. This extremely unbalanced scenario is kind of waste your 
compute resources. 

I don’t know whether my explanation is right. Plz correct me if you find any 
issue.THX

Best,
Yang
>  On 2017年1月23日, at 18:03, Pavel Plotnikov <pavel.plotni...@team.wrike.com> 
> wrote:
> 
> Hi Yang!
> 
> I don't know exactly why this happen, but i think GC can't work to fast 
> enough or size of data with additional objects created while computations to 
> big for executor. 
> And i found that this problem only if you make some data manipulations. You 
> can cache you data first, after that, write in one partiton.
> For example  
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.cache()
> or
> dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath)
> val dropDF = spark.read.parquet(temppath)
> and then
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> Best,
> 
> On Sun, Jan 22, 2017 at 12:31 PM Yang Cao <cybea...@gmail.com 
> <mailto:cybea...@gmail.com>> wrote:
> Also, do you know why this happen? 
> 
>> On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotni...@team.wrike.com 
>> <mailto:pavel.plotni...@team.wrike.com>> wrote:
>> 
> 
>> Hi Yang,
>> i have faced with the same problem on Mesos and to circumvent this issue i 
>> am usually increase partition number. On last step in your code you reduce 
>> number of partitions to 1, try to set bigger value, may be it solve this 
>> problem.
>> 
>> Cheers,
>> Pavel
>> 
>> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybea...@gmail.com 
>> <mailto:cybea...@gmail.com>> wrote:
>> Hi all,
>> 
>> I am running a spark application on YARN-client mode with 6 executors (each 
>> 4 cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
>> 2.1.0). I find that my executor memory keeps increasing until get killed by 
>> node manager; and give out the info that tells me to boost 
>> spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
>> size of memory allocated off-heap. But I don’t know when and how the spark 
>> engine will use this part of memory. Also increase that part of memory not 
>> always solve my problem. sometimes works sometimes not. It trends to be 
>> useless when the input data is large.
>> 
>> FYI, my app’s logic is quite simple. It means to combine the small files 
>> generated in one single day (one directory one day) into a single one and 
>> write back to hdfs. Here is the core code:
>> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
>> ${ts.day}").coalesce(400)
>> val dropDF = 
>> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
>> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
>> The source file may have hundreds to thousands level’s partition. And the 
>> total parquet file is around 1to 5 gigs. Also I find that in the step that 
>> shuffle reading data from different machines, The size of shuffle read is 
>> about 4 times larger than the input size, Which is wired or some principle I 
>> don’t know. 
>> 
>> Anyway, I have done some search myself for this problem. Some article said 
>> that it’s on the direct buffer memory (I don’t set myself). Some article 
>> said that people solve it with more frequent full GC. Also I find one people 
>> on SO with very similar situation: 
>> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
>>  
>> <http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn>
>> This guy claimed th

Re: physical memory usage keep increasing for spark app on Yarn

2017-01-22 Thread Yang Cao
Also, do you know why this happen? 
> On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotni...@team.wrike.com> 
> wrote:
> 
> Hi Yang,
> i have faced with the same problem on Mesos and to circumvent this issue i am 
> usually increase partition number. On last step in your code you reduce 
> number of partitions to 1, try to set bigger value, may be it solve this 
> problem.
> 
> Cheers,
> Pavel
> 
> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybea...@gmail.com 
> <mailto:cybea...@gmail.com>> wrote:
> Hi all,
> 
> I am running a spark application on YARN-client mode with 6 executors (each 4 
> cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
> 2.1.0). I find that my executor memory keeps increasing until get killed by 
> node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
> size of memory allocated off-heap. But I don’t know when and how the spark 
> engine will use this part of memory. Also increase that part of memory not 
> always solve my problem. sometimes works sometimes not. It trends to be 
> useless when the input data is large.
> 
> FYI, my app’s logic is quite simple. It means to combine the small files 
> generated in one single day (one directory one day) into a single one and 
> write back to hdfs. Here is the core code:
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> The source file may have hundreds to thousands level’s partition. And the 
> total parquet file is around 1to 5 gigs. Also I find that in the step that 
> shuffle reading data from different machines, The size of shuffle read is 
> about 4 times larger than the input size, Which is wired or some principle I 
> don’t know. 
> 
> Anyway, I have done some search myself for this problem. Some article said 
> that it’s on the direct buffer memory (I don’t set myself). Some article said 
> that people solve it with more frequent full GC. Also I find one people on SO 
> with very similar situation: 
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
>  
> <http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn>
> This guy claimed that it’s a bug with parquet but comment questioned him. 
> People in this mail list may also receive an email hours ago from blondowski 
> who described this problem while writing json: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none>
> 
> So it looks like to be common question for different output format. I hope 
> someone with experience about this problem could make an explanation about 
> this issue. Why this happen and what is a reliable way to solve this problem. 
> 
> Best,
> 
> 



Re: physical memory usage keep increasing for spark app on Yarn

2017-01-22 Thread Yang Cao
Hi,
Thank you for your suggestion. As I know If I set to bigger number I won’t get 
the output number as one file, right? My task is design to combine all that 
small files in one day to one big parquet file. THX again.

Best,
> On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotni...@team.wrike.com> 
> wrote:
> 
> Hi Yang,
> i have faced with the same problem on Mesos and to circumvent this issue i am 
> usually increase partition number. On last step in your code you reduce 
> number of partitions to 1, try to set bigger value, may be it solve this 
> problem.
> 
> Cheers,
> Pavel
> 
> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybea...@gmail.com 
> <mailto:cybea...@gmail.com>> wrote:
> Hi all,
> 
> I am running a spark application on YARN-client mode with 6 executors (each 4 
> cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
> 2.1.0). I find that my executor memory keeps increasing until get killed by 
> node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
> size of memory allocated off-heap. But I don’t know when and how the spark 
> engine will use this part of memory. Also increase that part of memory not 
> always solve my problem. sometimes works sometimes not. It trends to be 
> useless when the input data is large.
> 
> FYI, my app’s logic is quite simple. It means to combine the small files 
> generated in one single day (one directory one day) into a single one and 
> write back to hdfs. Here is the core code:
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> The source file may have hundreds to thousands level’s partition. And the 
> total parquet file is around 1to 5 gigs. Also I find that in the step that 
> shuffle reading data from different machines, The size of shuffle read is 
> about 4 times larger than the input size, Which is wired or some principle I 
> don’t know. 
> 
> Anyway, I have done some search myself for this problem. Some article said 
> that it’s on the direct buffer memory (I don’t set myself). Some article said 
> that people solve it with more frequent full GC. Also I find one people on SO 
> with very similar situation: 
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
>  
> <http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn>
> This guy claimed that it’s a bug with parquet but comment questioned him. 
> People in this mail list may also receive an email hours ago from blondowski 
> who described this problem while writing json: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none>
> 
> So it looks like to be common question for different output format. I hope 
> someone with experience about this problem could make an explanation about 
> this issue. Why this happen and what is a reliable way to solve this problem. 
> 
> Best,
> 
> 



physical memory usage keep increasing for spark app on Yarn

2017-01-20 Thread Yang Cao
Hi all,

I am running a spark application on YARN-client mode with 6 executors (each 4 
cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
2.1.0). I find that my executor memory keeps increasing until get killed by 
node manager; and give out the info that tells me to boost 
spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
size of memory allocated off-heap. But I don’t know when and how the spark 
engine will use this part of memory. Also increase that part of memory not 
always solve my problem. sometimes works sometimes not. It trends to be useless 
when the input data is large.

FYI, my app’s logic is quite simple. It means to combine the small files 
generated in one single day (one directory one day) into a single one and write 
back to hdfs. Here is the core code:
val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
${ts.day}").coalesce(400)
val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
The source file may have hundreds to thousands level’s partition. And the total 
parquet file is around 1to 5 gigs. Also I find that in the step that shuffle 
reading data from different machines, The size of shuffle read is about 4 times 
larger than the input size, Which is wired or some principle I don’t know. 

Anyway, I have done some search myself for this problem. Some article said that 
it’s on the direct buffer memory (I don’t set myself). Some article said that 
people solve it with more frequent full GC. Also I find one people on SO with 
very similar situation: 
http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
This guy claimed that it’s a bug with parquet but comment questioned him. 
People in this mail list may also receive an email hours ago from blondowski 
who described this problem while writing json: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none

So it looks like to be common question for different output format. I hope 
someone with experience about this problem could make an explanation about this 
issue. Why this happen and what is a reliable way to solve this problem. 

Best,




filter push down on har file

2017-01-16 Thread Yang Cao
Hi,

My team just do a archive on last year’s parquet files. I wonder whether the 
filter push down optimization still work when I read data through 
“har:///path/to/data/“? THX.

Best,
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kryo On Spark 1.6.0

2017-01-10 Thread Yang Cao
If you don’t mind, could please share me with the scala solution? I tried to 
use kryo but seamed not work at all. I hope to get some practical example. THX
> On 2017年1月10日, at 19:10, Enrico DUrso  wrote:
> 
> Hi,
> 
> I am trying to use Kryo on Spark 1.6.0.
> I am able to register my own classes and it works, but when I set 
> “spark.kryo.registrationRequired “ to true, I get an error about a scala 
> class:
> “Class is not registered: scala.collection.mutable.WrappedArray$ofRef”.
> 
> Any of you has already solved this issue in Java? I found the code to solve 
> it in Scala, but unable to register this class in Java.
> 
> Cheers,
> 
> enrico
> 
> 
> CONFIDENTIALITY WARNING.
> This message and the information contained in or attached to it are private 
> and confidential and intended exclusively for the addressee. everis informs 
> to whom it may receive it in error that it contains privileged information 
> and its use, copy, reproduction or distribution is prohibited. If you are not 
> an intended recipient of this E-mail, please notify the sender, delete it and 
> do not read, act upon, print, disclose, copy, retain or redistribute any 
> portion of this E-mail.



Re: L1 regularized Logistic regression ?

2017-01-04 Thread Yang
ah, found it, it's https://www.google.com/search?q=OWLQN

thanks!

On Wed, Jan 4, 2017 at 7:34 PM, J G <jonrgr...@gmail.com> wrote:

> I haven't run this, but there is an elasticnetparam for Logistic
> Regression here: https://spark.apache.org/docs/2.0.2/ml-
> classification-regression.html#logistic-regression
>
> You'd set elasticnetparam = 1 for Lasso
>
> On Wed, Jan 4, 2017 at 7:13 PM, Yang <tedd...@gmail.com> wrote:
>
>> does mllib support this?
>>
>> I do see Lasso impl here https://github.com/apache
>> /spark/blob/master/mllib/src/main/scala/org/apache/spark/
>> mllib/regression/Lasso.scala
>>
>> if it supports LR , could you please show me a link? what algorithm does
>> it use?
>>
>> thanks
>>
>>
>


L1 regularized Logistic regression ?

2017-01-04 Thread Yang
does mllib support this?

I do see Lasso impl here
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala

if it supports LR , could you please show me a link? what algorithm does it
use?

thanks


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Ayan,

This "inline view" idea is really awesome and enlightens me! Finally I have
a plan to move on. I greatly appreciate your help!

Best regards,
Yang

2017-01-03 18:14 GMT+01:00 ayan guha <guha.a...@gmail.com>:

> Ahh I see what you meanI confused two terminologiesbecause we were
> talking about partitioning and then changed topic to identify changed data
> 
>
> For that, you can "construct" a dbtable as an inline view -
>
> viewSQL = "(select * from table where  > '')".
> replace("","inserted_on").replace(" value>",checkPointedValue)
> dbtable =viewSQL
>
> refer to this
> <http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/>
> blog...
>
> So, in summary, you have 2 things
>
> 1. Identify changed data - my suggestion to use dbtable with inline view
> 2. parallelism - use numPartition,lowerbound,upper bound to generate
> number of partitions
>
> HTH
>
>
>
> On Wed, Jan 4, 2017 at 3:46 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>
>> Hi Ayan,
>>
>> Yeah, I understand your proposal, but according to here
>> http://spark.apache.org/docs/latest/sql-programming-gui
>> de.html#jdbc-to-other-databases, it says
>>
>> Notice that lowerBound and upperBound are just used to decide the
>> partition stride, not for filtering the rows in table. So all rows in the
>> table will be partitioned and returned. This option applies only to reading.
>>
>> So my interpretation is all rows in the table are ingested, and this
>> "lowerBound" and "upperBound" is the span of each partition. Well, I am not
>> a native English speaker, maybe it means differently?
>>
>> Best regards,
>> Yang
>>
>> 2017-01-03 17:23 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>
>>> Hi
>>>
>>> You need to store and capture the Max of the column you intend to use
>>> for identifying new records (Ex: INSERTED_ON) after every successful run of
>>> your job. Then, use the value in lowerBound option.
>>>
>>> Essentially, you want to create a query like
>>>
>>> select * from table where INSERTED_ON > lowerBound and
>>> INSERTED_ON>>
>>> everytime you run the job
>>>
>>>
>>>
>>> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>>>
>>>> Hi Ayan,
>>>>
>>>> Thanks a lot for your suggestion. I am currently looking into sqoop.
>>>>
>>>> Concerning your suggestion for Spark, it is indeed parallelized with
>>>> multiple workers, but the job is one-off and cannot keep streaming.
>>>> Moreover, I cannot specify any "start row" in the job, it will always
>>>> ingest the entire table. So I also cannot simulate a streaming process by
>>>> starting the job in fix intervals...
>>>>
>>>> Best regards,
>>>> Yang
>>>>
>>>> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>>>
>>>>> Hi
>>>>>
>>>>> While the solutions provided by others looks promising and I'd like to
>>>>> try out few of them, our old pal sqoop already "does" the job. It has a
>>>>> incremental mode where you can provide a --check-column and
>>>>> --last-modified-value combination to grab the data - and yes, sqoop
>>>>> essentially does it by running a MAP-only job which spawns number of
>>>>> parallel map task to grab data from DB.
>>>>>
>>>>> In Spark, you can use sqlContext.load function for JDBC and use
>>>>> partitionColumn and numPartition to define parallelism of connection.
>>>>>
>>>>> Best
>>>>> Ayan
>>>>>
>>>>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ayan,
>>>>>>
>>>>>> Thanks a lot for such a detailed response. I really appreciate it!
>>>>>>
>>>>>> I think this use case can be generalized, because the data is
>>>>>> immutable and append-only. We only need to find one column or timestamp 
>>>>>> to
>>>>>> track the last row consumed in the previous ingestion. This pattern 
>>>>>> should
>>>>>> be common when storing sensor data. If the dat

Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Ayan,

Yeah, I understand your proposal, but according to here
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases,
it says

Notice that lowerBound and upperBound are just used to decide the partition
stride, not for filtering the rows in table. So all rows in the table will
be partitioned and returned. This option applies only to reading.

So my interpretation is all rows in the table are ingested, and this
"lowerBound" and "upperBound" is the span of each partition. Well, I am not
a native English speaker, maybe it means differently?

Best regards,
Yang

2017-01-03 17:23 GMT+01:00 ayan guha <guha.a...@gmail.com>:

> Hi
>
> You need to store and capture the Max of the column you intend to use for
> identifying new records (Ex: INSERTED_ON) after every successful run of
> your job. Then, use the value in lowerBound option.
>
> Essentially, you want to create a query like
>
> select * from table where INSERTED_ON > lowerBound and
> INSERTED_ON
> everytime you run the job
>
>
>
> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>
>> Hi Ayan,
>>
>> Thanks a lot for your suggestion. I am currently looking into sqoop.
>>
>> Concerning your suggestion for Spark, it is indeed parallelized with
>> multiple workers, but the job is one-off and cannot keep streaming.
>> Moreover, I cannot specify any "start row" in the job, it will always
>> ingest the entire table. So I also cannot simulate a streaming process by
>> starting the job in fix intervals...
>>
>> Best regards,
>> Yang
>>
>> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>
>>> Hi
>>>
>>> While the solutions provided by others looks promising and I'd like to
>>> try out few of them, our old pal sqoop already "does" the job. It has a
>>> incremental mode where you can provide a --check-column and
>>> --last-modified-value combination to grab the data - and yes, sqoop
>>> essentially does it by running a MAP-only job which spawns number of
>>> parallel map task to grab data from DB.
>>>
>>> In Spark, you can use sqlContext.load function for JDBC and use
>>> partitionColumn and numPartition to define parallelism of connection.
>>>
>>> Best
>>> Ayan
>>>
>>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>>>
>>>> Hi Ayan,
>>>>
>>>> Thanks a lot for such a detailed response. I really appreciate it!
>>>>
>>>> I think this use case can be generalized, because the data is immutable
>>>> and append-only. We only need to find one column or timestamp to track the
>>>> last row consumed in the previous ingestion. This pattern should be common
>>>> when storing sensor data. If the data is mutable, then the solution will be
>>>> surely difficult and vendor specific as you said.
>>>>
>>>> The workflow you proposed is very useful. The difficulty part is how to
>>>> parallelize the ingestion task. With Spark when I have multiple workers
>>>> working on the same job, I don't know if there is a way and how to
>>>> dynamically change the row range each worker should process in realtime...
>>>>
>>>> I tried to find out if there is any candidate available out of the box,
>>>> instead of reinventing the wheel. At this moment I have not discovered any
>>>> existing tool can parallelize ingestion tasks on one database. Is Sqoop a
>>>> proper candidate from your knowledge?
>>>>
>>>> Thank you again and have a nice day.
>>>>
>>>> Best regards,
>>>> Yang
>>>>
>>>>
>>>>
>>>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>>>
>>>>>
>>>>>
>>>>> "If data ingestion speed is faster than data production speed, then
>>>>> eventually the entire database will be harvested and those workers will
>>>>> start to "tail" the database for new data streams and the processing
>>>>> becomes real time."
>>>>>
>>>>> This part is really database dependent. So it will be hard to
>>>>> generalize it. For example, say you have a batch interval of 10
>>>>> secswhat happens if you get more than one updates on the same row
>>>>> within 10 secs? You will get a snapshot of every 10 secs. Now, different
>&g

Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Ayan,

Thanks a lot for your suggestion. I am currently looking into sqoop.

Concerning your suggestion for Spark, it is indeed parallelized with
multiple workers, but the job is one-off and cannot keep streaming.
Moreover, I cannot specify any "start row" in the job, it will always
ingest the entire table. So I also cannot simulate a streaming process by
starting the job in fix intervals...

Best regards,
Yang

2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>:

> Hi
>
> While the solutions provided by others looks promising and I'd like to try
> out few of them, our old pal sqoop already "does" the job. It has a
> incremental mode where you can provide a --check-column and
> --last-modified-value combination to grab the data - and yes, sqoop
> essentially does it by running a MAP-only job which spawns number of
> parallel map task to grab data from DB.
>
> In Spark, you can use sqlContext.load function for JDBC and use
> partitionColumn and numPartition to define parallelism of connection.
>
> Best
> Ayan
>
> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>
>> Hi Ayan,
>>
>> Thanks a lot for such a detailed response. I really appreciate it!
>>
>> I think this use case can be generalized, because the data is immutable
>> and append-only. We only need to find one column or timestamp to track the
>> last row consumed in the previous ingestion. This pattern should be common
>> when storing sensor data. If the data is mutable, then the solution will be
>> surely difficult and vendor specific as you said.
>>
>> The workflow you proposed is very useful. The difficulty part is how to
>> parallelize the ingestion task. With Spark when I have multiple workers
>> working on the same job, I don't know if there is a way and how to
>> dynamically change the row range each worker should process in realtime...
>>
>> I tried to find out if there is any candidate available out of the box,
>> instead of reinventing the wheel. At this moment I have not discovered any
>> existing tool can parallelize ingestion tasks on one database. Is Sqoop a
>> proper candidate from your knowledge?
>>
>> Thank you again and have a nice day.
>>
>> Best regards,
>> Yang
>>
>>
>>
>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>
>>>
>>>
>>> "If data ingestion speed is faster than data production speed, then
>>> eventually the entire database will be harvested and those workers will
>>> start to "tail" the database for new data streams and the processing
>>> becomes real time."
>>>
>>> This part is really database dependent. So it will be hard to generalize
>>> it. For example, say you have a batch interval of 10 secswhat happens
>>> if you get more than one updates on the same row within 10 secs? You will
>>> get a snapshot of every 10 secs. Now, different databases provide different
>>> mechanisms to expose all DML changes, MySQL has binlogs, oracle has log
>>> shipping, cdc,golden gate and so ontypically it requires new product or
>>> new licenses and most likely new component installation on production db :)
>>>
>>> So, if we keep real CDC solutions out of scope, a simple snapshot
>>> solution can be achieved fairly easily by
>>>
>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
>>> 3. Running an extraction/load mechanism which will take data from DB
>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can
>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
>>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
>>> ETL tools would too...
>>> 4. Finally, update check point...
>>>
>>> You may "determine" checkpoint from the data you already have in HDFS if
>>> you create a Hive structure on it.
>>>
>>> Best
>>> AYan
>>>
>>>
>>>
>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd@gmail.com> wrote:
>>>
>>>> why not sync binlog of mysql(hopefully the data is immutable and the
>>>> table is append-only), send the log through kafka and then consume it by
>>>> spark streaming?
>>>>
>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> We don't 

Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Tamas,

Thanks a lot for your suggestion! I will also investigate this one later.

Best regards,
Yang

2017-01-03 12:38 GMT+01:00 Tamas Szuromi <tamas.szur...@odigeo.com>:

>
> You can also try https://github.com/zendesk/maxwell
>
> Tamas
>
> On 3 January 2017 at 12:25, Amrit Jangid <amrit.jan...@goibibo.com> wrote:
>
>> You can try out *debezium* : https://github.com/debezium. it reads data
>> from bin-logs, provides structure and stream into Kafka.
>>
>> Now Kafka can be your new source for streaming.
>>
>> On Tue, Jan 3, 2017 at 4:36 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>>
>>> Hi Hongdi,
>>>
>>> Thanks a lot for your suggestion. The data is truely immutable and the
>>> table is append-only. But actually there are different databases involved,
>>> so the only feature they share in common and I can depend on is jdbc...
>>>
>>> Best regards,
>>> Yang
>>>
>>>
>>> 2016-12-30 6:45 GMT+01:00 任弘迪 <ryan.hd@gmail.com>:
>>>
>>>> why not sync binlog of mysql(hopefully the data is immutable and the
>>>> table is append-only), send the log through kafka and then consume it by
>>>> spark streaming?
>>>>
>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> We don't support this yet, but I've opened this JIRA as it sounds
>>>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>>>>
>>>>> In the mean time you could try implementing your own Source, but that
>>>>> is pretty low level and is not yet a stable API.
>>>>>
>>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
>>>>> yyz1...@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Thanks a lot for your contributions to bring us new technologies.
>>>>>>
>>>>>> I don't want to waste your time, so before I write to you, I googled,
>>>>>> checked stackoverflow and mailing list archive with keywords "streaming"
>>>>>> and "jdbc". But I was not able to get any solution to my use case. I 
>>>>>> hope I
>>>>>> can get some clarification from you.
>>>>>>
>>>>>> The use case is quite straightforward, I need to harvest a relational
>>>>>> database via jdbc, do something with data, and store result into Kafka. I
>>>>>> am stuck at the first step, and the difficulty is as follows:
>>>>>>
>>>>>> 1. The database is too large to ingest with one thread.
>>>>>> 2. The database is dynamic and time series data comes in constantly.
>>>>>>
>>>>>> Then an ideal workflow is that multiple workers process partitions of
>>>>>> data incrementally according to a time window. For example, the 
>>>>>> processing
>>>>>> starts from the earliest data with each batch containing data for one 
>>>>>> hour.
>>>>>> If data ingestion speed is faster than data production speed, then
>>>>>> eventually the entire database will be harvested and those workers will
>>>>>> start to "tail" the database for new data streams and the processing
>>>>>> becomes real time.
>>>>>>
>>>>>> With Spark SQL I can ingest data from a JDBC source with partitions
>>>>>> divided by time windows, but how can I dynamically increment the time
>>>>>> windows during execution? Assume that there are two workers ingesting 
>>>>>> data
>>>>>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next 
>>>>>> task
>>>>>> for 2017-01-03. But I am not able to find out how to increment those 
>>>>>> values
>>>>>> during execution.
>>>>>>
>>>>>> Then I looked into Structured Streaming. It looks much more promising
>>>>>> because window operations based on event time are considered during
>>>>>> streaming, which could be the solution to my use case. However, from
>>>>>> documentation and code example I did not find anything related to 
>>>>>> streaming
>>>>>> data from a growing database. Is there anything I can read to achieve my
>>>>>> goal?
>>>>>>
>>>>>> Any suggestion is highly appreciated. Thank you very much and have a
>>>>>> nice day.
>>>>>>
>>>>>> Best regards,
>>>>>> Yang
>>>>>> -
>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>>
>> Regards,
>> Amrit
>> Data Team
>>
>
>


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Amrit,

Thanks a lot for your suggestion! I will investigate it later.

Best regards,
Yang

2017-01-03 12:25 GMT+01:00 Amrit Jangid <amrit.jan...@goibibo.com>:

> You can try out *debezium* : https://github.com/debezium. it reads data
> from bin-logs, provides structure and stream into Kafka.
>
> Now Kafka can be your new source for streaming.
>
> On Tue, Jan 3, 2017 at 4:36 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>
>> Hi Hongdi,
>>
>> Thanks a lot for your suggestion. The data is truely immutable and the
>> table is append-only. But actually there are different databases involved,
>> so the only feature they share in common and I can depend on is jdbc...
>>
>> Best regards,
>> Yang
>>
>>
>> 2016-12-30 6:45 GMT+01:00 任弘迪 <ryan.hd@gmail.com>:
>>
>>> why not sync binlog of mysql(hopefully the data is immutable and the
>>> table is append-only), send the log through kafka and then consume it by
>>> spark streaming?
>>>
>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> We don't support this yet, but I've opened this JIRA as it sounds
>>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>>>
>>>> In the mean time you could try implementing your own Source, but that
>>>> is pretty low level and is not yet a stable API.
>>>>
>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
>>>> yyz1...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Thanks a lot for your contributions to bring us new technologies.
>>>>>
>>>>> I don't want to waste your time, so before I write to you, I googled,
>>>>> checked stackoverflow and mailing list archive with keywords "streaming"
>>>>> and "jdbc". But I was not able to get any solution to my use case. I hope 
>>>>> I
>>>>> can get some clarification from you.
>>>>>
>>>>> The use case is quite straightforward, I need to harvest a relational
>>>>> database via jdbc, do something with data, and store result into Kafka. I
>>>>> am stuck at the first step, and the difficulty is as follows:
>>>>>
>>>>> 1. The database is too large to ingest with one thread.
>>>>> 2. The database is dynamic and time series data comes in constantly.
>>>>>
>>>>> Then an ideal workflow is that multiple workers process partitions of
>>>>> data incrementally according to a time window. For example, the processing
>>>>> starts from the earliest data with each batch containing data for one 
>>>>> hour.
>>>>> If data ingestion speed is faster than data production speed, then
>>>>> eventually the entire database will be harvested and those workers will
>>>>> start to "tail" the database for new data streams and the processing
>>>>> becomes real time.
>>>>>
>>>>> With Spark SQL I can ingest data from a JDBC source with partitions
>>>>> divided by time windows, but how can I dynamically increment the time
>>>>> windows during execution? Assume that there are two workers ingesting data
>>>>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next 
>>>>> task
>>>>> for 2017-01-03. But I am not able to find out how to increment those 
>>>>> values
>>>>> during execution.
>>>>>
>>>>> Then I looked into Structured Streaming. It looks much more promising
>>>>> because window operations based on event time are considered during
>>>>> streaming, which could be the solution to my use case. However, from
>>>>> documentation and code example I did not find anything related to 
>>>>> streaming
>>>>> data from a growing database. Is there anything I can read to achieve my
>>>>> goal?
>>>>>
>>>>> Any suggestion is highly appreciated. Thank you very much and have a
>>>>> nice day.
>>>>>
>>>>> Best regards,
>>>>> Yang
>>>>> -
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
>
> Regards,
> Amrit
> Data Team
>


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Ayan,

Thanks a lot for such a detailed response. I really appreciate it!

I think this use case can be generalized, because the data is immutable and
append-only. We only need to find one column or timestamp to track the last
row consumed in the previous ingestion. This pattern should be common when
storing sensor data. If the data is mutable, then the solution will be
surely difficult and vendor specific as you said.

The workflow you proposed is very useful. The difficulty part is how to
parallelize the ingestion task. With Spark when I have multiple workers
working on the same job, I don't know if there is a way and how to
dynamically change the row range each worker should process in realtime...

I tried to find out if there is any candidate available out of the box,
instead of reinventing the wheel. At this moment I have not discovered any
existing tool can parallelize ingestion tasks on one database. Is Sqoop a
proper candidate from your knowledge?

Thank you again and have a nice day.

Best regards,
Yang



2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>:

>
>
> "If data ingestion speed is faster than data production speed, then
> eventually the entire database will be harvested and those workers will
> start to "tail" the database for new data streams and the processing
> becomes real time."
>
> This part is really database dependent. So it will be hard to generalize
> it. For example, say you have a batch interval of 10 secswhat happens
> if you get more than one updates on the same row within 10 secs? You will
> get a snapshot of every 10 secs. Now, different databases provide different
> mechanisms to expose all DML changes, MySQL has binlogs, oracle has log
> shipping, cdc,golden gate and so ontypically it requires new product or
> new licenses and most likely new component installation on production db :)
>
> So, if we keep real CDC solutions out of scope, a simple snapshot solution
> can be achieved fairly easily by
>
> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
> 3. Running an extraction/load mechanism which will take data from DB
> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can
> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
> ETL tools would too...
> 4. Finally, update check point...
>
> You may "determine" checkpoint from the data you already have in HDFS if
> you create a Hive structure on it.
>
> Best
> AYan
>
>
>
> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd@gmail.com> wrote:
>
>> why not sync binlog of mysql(hopefully the data is immutable and the
>> table is append-only), send the log through kafka and then consume it by
>> spark streaming?
>>
>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> We don't support this yet, but I've opened this JIRA as it sounds
>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>>
>>> In the mean time you could try implementing your own Source, but that is
>>> pretty low level and is not yet a stable API.
>>>
>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <yyz1...@gmail.com
>>> > wrote:
>>>
>>>> Hi all,
>>>>
>>>> Thanks a lot for your contributions to bring us new technologies.
>>>>
>>>> I don't want to waste your time, so before I write to you, I googled,
>>>> checked stackoverflow and mailing list archive with keywords "streaming"
>>>> and "jdbc". But I was not able to get any solution to my use case. I hope I
>>>> can get some clarification from you.
>>>>
>>>> The use case is quite straightforward, I need to harvest a relational
>>>> database via jdbc, do something with data, and store result into Kafka. I
>>>> am stuck at the first step, and the difficulty is as follows:
>>>>
>>>> 1. The database is too large to ingest with one thread.
>>>> 2. The database is dynamic and time series data comes in constantly.
>>>>
>>>> Then an ideal workflow is that multiple workers process partitions of
>>>> data incrementally according to a time window. For example, the processing
>>>> starts from the earliest data with each batch containing data for one hour.
>>>> If data ingestion speed is faster than data production speed, then
>>>> eventually the entire database will

Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Hongdi,

Thanks a lot for your suggestion. The data is truely immutable and the
table is append-only. But actually there are different databases involved,
so the only feature they share in common and I can depend on is jdbc...

Best regards,
Yang


2016-12-30 6:45 GMT+01:00 任弘迪 <ryan.hd@gmail.com>:

> why not sync binlog of mysql(hopefully the data is immutable and the table
> is append-only), send the log through kafka and then consume it by spark
> streaming?
>
> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> We don't support this yet, but I've opened this JIRA as it sounds
>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>
>> In the mean time you could try implementing your own Source, but that is
>> pretty low level and is not yet a stable API.
>>
>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <yyz1...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> Thanks a lot for your contributions to bring us new technologies.
>>>
>>> I don't want to waste your time, so before I write to you, I googled,
>>> checked stackoverflow and mailing list archive with keywords "streaming"
>>> and "jdbc". But I was not able to get any solution to my use case. I hope I
>>> can get some clarification from you.
>>>
>>> The use case is quite straightforward, I need to harvest a relational
>>> database via jdbc, do something with data, and store result into Kafka. I
>>> am stuck at the first step, and the difficulty is as follows:
>>>
>>> 1. The database is too large to ingest with one thread.
>>> 2. The database is dynamic and time series data comes in constantly.
>>>
>>> Then an ideal workflow is that multiple workers process partitions of
>>> data incrementally according to a time window. For example, the processing
>>> starts from the earliest data with each batch containing data for one hour.
>>> If data ingestion speed is faster than data production speed, then
>>> eventually the entire database will be harvested and those workers will
>>> start to "tail" the database for new data streams and the processing
>>> becomes real time.
>>>
>>> With Spark SQL I can ingest data from a JDBC source with partitions
>>> divided by time windows, but how can I dynamically increment the time
>>> windows during execution? Assume that there are two workers ingesting data
>>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next task
>>> for 2017-01-03. But I am not able to find out how to increment those values
>>> during execution.
>>>
>>> Then I looked into Structured Streaming. It looks much more promising
>>> because window operations based on event time are considered during
>>> streaming, which could be the solution to my use case. However, from
>>> documentation and code example I did not find anything related to streaming
>>> data from a growing database. Is there anything I can read to achieve my
>>> goal?
>>>
>>> Any suggestion is highly appreciated. Thank you very much and have a
>>> nice day.
>>>
>>> Best regards,
>>> Yang
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread Yuanzhe Yang
Hi Michael,

Thanks a lot for your ticket. At least it is the first step.

Best regards,
Yang

2016-12-30 2:01 GMT+01:00 Michael Armbrust <mich...@databricks.com>:

> We don't support this yet, but I've opened this JIRA as it sounds
> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>
> In the mean time you could try implementing your own Source, but that is
> pretty low level and is not yet a stable API.
>
> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <yyz1...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> Thanks a lot for your contributions to bring us new technologies.
>>
>> I don't want to waste your time, so before I write to you, I googled,
>> checked stackoverflow and mailing list archive with keywords "streaming"
>> and "jdbc". But I was not able to get any solution to my use case. I hope I
>> can get some clarification from you.
>>
>> The use case is quite straightforward, I need to harvest a relational
>> database via jdbc, do something with data, and store result into Kafka. I
>> am stuck at the first step, and the difficulty is as follows:
>>
>> 1. The database is too large to ingest with one thread.
>> 2. The database is dynamic and time series data comes in constantly.
>>
>> Then an ideal workflow is that multiple workers process partitions of
>> data incrementally according to a time window. For example, the processing
>> starts from the earliest data with each batch containing data for one hour.
>> If data ingestion speed is faster than data production speed, then
>> eventually the entire database will be harvested and those workers will
>> start to "tail" the database for new data streams and the processing
>> becomes real time.
>>
>> With Spark SQL I can ingest data from a JDBC source with partitions
>> divided by time windows, but how can I dynamically increment the time
>> windows during execution? Assume that there are two workers ingesting data
>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next task
>> for 2017-01-03. But I am not able to find out how to increment those values
>> during execution.
>>
>> Then I looked into Structured Streaming. It looks much more promising
>> because window operations based on event time are considered during
>> streaming, which could be the solution to my use case. However, from
>> documentation and code example I did not find anything related to streaming
>> data from a growing database. Is there anything I can read to achieve my
>> goal?
>>
>> Any suggestion is highly appreciated. Thank you very much and have a nice
>> day.
>>
>> Best regards,
>> Yang
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


[Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2016-12-29 Thread Yuanzhe Yang (杨远哲)
Hi all,

Thanks a lot for your contributions to bring us new technologies.

I don't want to waste your time, so before I write to you, I googled, checked 
stackoverflow and mailing list archive with keywords "streaming" and "jdbc". 
But I was not able to get any solution to my use case. I hope I can get some 
clarification from you.

The use case is quite straightforward, I need to harvest a relational database 
via jdbc, do something with data, and store result into Kafka. I am stuck at 
the first step, and the difficulty is as follows:

1. The database is too large to ingest with one thread.
2. The database is dynamic and time series data comes in constantly.

Then an ideal workflow is that multiple workers process partitions of data 
incrementally according to a time window. For example, the processing starts 
from the earliest data with each batch containing data for one hour. If data 
ingestion speed is faster than data production speed, then eventually the 
entire database will be harvested and those workers will start to "tail" the 
database for new data streams and the processing becomes real time.

With Spark SQL I can ingest data from a JDBC source with partitions divided by 
time windows, but how can I dynamically increment the time windows during 
execution? Assume that there are two workers ingesting data of 2017-01-01 and 
2017-01-02, the one which finishes quicker gets next task for 2017-01-03. But I 
am not able to find out how to increment those values during execution.

Then I looked into Structured Streaming. It looks much more promising because 
window operations based on event time are considered during streaming, which 
could be the solution to my use case. However, from documentation and code 
example I did not find anything related to streaming data from a growing 
database. Is there anything I can read to achieve my goal?

Any suggestion is highly appreciated. Thank you very much and have a nice day.

Best regards,
Yang
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: scikit-learn and mllib difference in predictions python

2016-12-25 Thread Yuhao Yang
Hi ioanna,

I'd like to help look into it. Is there a way to access your training data?

2016-12-20 17:21 GMT-08:00 ioanna :

> I have an issue with an SVM model trained for binary classification using
> Spark 2.0.0.
> I have followed the same logic using scikit-learn and MLlib, using the
> exact
> same dataset.
> For scikit learn I have the following code:
>
> svc_model = SVC()
> svc_model.fit(X_train, y_train)
>
> print "supposed to be 1"
> print svc_model.predict([15 ,15,0,15,15,4,12,8,0,7])
> print
> svc_model.predict([15.0,15.0,15.0,7.0,7.0,15.0,15.0,0.0,12.0,15.0])
> print svc_model.predict([15.0,15.0,7.0,0.0,7.0,0.0,15.0,15.0,15.
> 0,15.0])
> print svc_model.predict([7.0,0.0,15.0,15.0,15.0,15.0,7.0,7.0,15.0,
> 15.0])
>
> print "supposed to be 0"
> print svc_model.predict([18.0, 15.0, 7.0, 7.0, 15.0, 0.0, 15.0, 15.0,
> 15.0, 15.0])
> print svc_model.predict([ 11.0,13.0,7.0,10.0,7.0,13.0,7.
> 0,19.0,7.0,7.0])
> print svc_model.predict([ 15.0, 15.0, 18.0, 7.0, 15.0, 15.0, 15.0,
> 18.0,
> 7.0, 15.0])
> print svc_model.predict([ 15.0, 15.0, 8.0, 0.0, 0.0, 8.0, 15.0, 15.0,
> 15.0, 7.0])
>
>
> and it returns:
>
> supposed to be 1
> [0]
> [1]
> [1]
> [1]
> supposed to be 0
> [0]
> [0]
> [0]
> [0]
>
> For spark am doing:
>
> model_svm = SVMWithSGD.train(trainingData, iterations=100)
>
> model_svm.clearThreshold()
>
> print "supposed to be 1"
> print
> model_svm.predict(Vectors.dense(15.0,15.0,0.0,15.0,15.0,
> 4.0,12.0,8.0,0.0,7.0))
> print
> model_svm.predict(Vectors.dense(15.0,15.0,15.0,7.0,7.0,
> 15.0,15.0,0.0,12.0,15.0))
> print
> model_svm.predict(Vectors.dense(15.0,15.0,7.0,0.0,7.0,0.
> 0,15.0,15.0,15.0,15.0))
> print
> model_svm.predict(Vectors.dense(7.0,0.0,15.0,15.0,15.0,
> 15.0,7.0,7.0,15.0,15.0))
>
> print "supposed to be 0"
> print model_svm.predict(Vectors.dense(18.0, 15.0, 7.0, 7.0, 15.0, 0.0,
> 15.0, 15.0, 15.0, 15.0))
> print
> model_svm.predict(Vectors.dense(11.0,13.0,7.0,10.0,7.0,
> 13.0,7.0,19.0,7.0,7.0))
> print model_svm.predict(Vectors.dense(15.0, 15.0, 18.0, 7.0, 15.0,
> 15.0,
> 15.0, 18.0, 7.0, 15.0))
> print model_svm.predict(Vectors.dense(15.0, 15.0, 8.0, 0.0, 0.0, 8.0,
> 15.0, 15.0, 15.0, 7.0))
>
> which returns:
>
> supposed to be 1
> 12.8250120159
> 16.0786937313
> 14.2139435305
> 16.5115589658
> supposed to be 0
> 17.1311777004
> 14.075461697
> 20.8883372052
> 12.9132580999
>
> when I am setting the threshold I am either getting all zeros or all ones.
>
> Does anyone know how to approach this problem?
>
> As I said I have checked multiple times that my dataset and feature
> extraction logic are exactly the same in both cases.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/scikit-learn-and-mllib-difference-in-
> predictions-python-tp28240.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark linear regression error training dataset is empty

2016-12-25 Thread Yuhao Yang
Hi Xiaomeng,

Have you tried to confirm the DataFrame contents before fitting? like
assembleddata.show()
before fitting.

Regards,
Yuhao

2016-12-21 10:05 GMT-08:00 Xiaomeng Wan :

> Hi,
>
> I am running linear regression on a dataframe and get the following error:
>
> Exception in thread "main" java.lang.AssertionError: assertion failed:
> Training dataset is empty.
>
> at scala.Predef$.assert(Predef.scala:170)
>
> at org.apache.spark.ml.optim.WeightedLeastSquares$Aggregator.validate(
> WeightedLeastSquares.scala:247)
>
> at org.apache.spark.ml.optim.WeightedLeastSquares.fit(
> WeightedLeastSquares.scala:82)
>
> at org.apache.spark.ml.regression.LinearRegression.
> train(LinearRegression.scala:180)
>
> at org.apache.spark.ml.regression.LinearRegression.
> train(LinearRegression.scala:70)
>
> at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
>
> here is the data and code:
>
> {"label":79.3,"features":{"type":1,"values":[6412.
> 14350001,888.0,1407.0,1.5844594594594594,10.614,12.07,
> 0.12062966031483012,0.9991237664152219,6.065,0.49751449875724935]}}
>
> {"label":72.3,"features":{"type":1,"values":[6306.
> 04450001,1084.0,1451.0,1.338560885608856,7.018,12.04,0.
> 41710963455149497,0.9992054343916128,6.05,0.4975083056478405]}}
>
> {"label":76.7,"features":{"type":1,"values":[6142.
> 9203,1494.0,1437.0,0.9618473895582329,7.939,12.06,
> 0.34170812603648426,0.9992216101762574,6.06,0.49751243781094534]}}
>
> val lr = new LinearRegression().setMaxIter(300).setFeaturesCol("features")
>
> val lrModel = lr.fit(assembleddata)
>
> Any clue or inputs are appreciated.
>
>
> Regards,
>
> Shawn
>
>
>


spark-shell fails to redefine values

2016-12-21 Thread Yang
summary: Spark-shell fails to redefine values in some cases, this is at
least found in a case where "implicit" is involved, but not limited to such
cases

run the following in spark-shell, u can see that the last redefinition does
not take effect. the same code runs in plain scala REPL without problems

scala> class Useless{}
defined class Useless

scala> class Useless1 {}
defined class Useless1

scala> implicit val eee :Useless = new Useless()
eee: Useless = Useless@2c6beb3e

scala> implicit val eee:Useless1 = new Useless1()
eee: Useless1 = Useless1@66cb003

scala> eee
res24: Useless = Useless@1ec5bf62


Re: Multilabel classification with Spark MLlib

2016-11-29 Thread Yuhao Yang
If problem transformation is not an option (
https://en.wikipedia.org/wiki/Multi-label_classification#Problem_transformation_methods),
I would try to develop a customized algorithm based
on MultilayerPerceptronClassifier, in which you probably need to
rewrite LabelConverter.

2016-11-29 9:02 GMT-08:00 Md. Rezaul Karim 
:

> Hello All,
>
> Is there anyone who has developed multilabel classification applications
> with Spark?
>
> I found an example class in Spark distribution (i.e.,
> *JavaMultiLabelClassificationMetricsExample.java*) which is not a
> classifier but an evaluator for a multilabel classification. Moreover, the
> example is not well documented (i.e., I did not understand which one is a
> label and which one is a feature).
>
> More specifically, I was looking for some example implemented in
> Java/Scala/Python so that I can develop my own multi-label classification
> applications.
>
>
>
> Any kind of help would be highly appreciated.
>
>
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim,* BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>
>


Re: Kafka segmentation

2016-11-16 Thread bo yang
I did not remember what exact configuration I was using. That link has some
good information! Thanks Cody!


On Wed, Nov 16, 2016 at 5:32 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Yeah, if you're reporting issues, please be clear as to whether
> backpressure is enabled, and whether maxRatePerPartition is set.
>
> I expect that there is something wrong with backpressure, see e.g.
> https://issues.apache.org/jira/browse/SPARK-18371
>
> On Wed, Nov 16, 2016 at 5:05 PM, bo yang <bobyan...@gmail.com> wrote:
> > I hit similar issue with Spark Streaming. The batch size seemed a little
> > random. Sometime it was large with many Kafka messages inside same batch,
> > sometimes it was very small with just a few messages. Is it possible that
> > was caused by the backpressure implementation in Spark Streaming?
> >
> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Moved to user list.
> >>
> >> I'm not really clear on what you're trying to accomplish (why put the
> >> csv file through Kafka instead of reading it directly with spark?)
> >>
> >> auto.offset.reset=largest just means that when starting the job
> >> without any defined offsets, it will start at the highest (most
> >> recent) available offsets.  That's probably not what you want if
> >> you've already loaded csv lines into kafka.
> >>
> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien <hbthien0...@gmail.com
> >
> >> wrote:
> >> > Hi all,
> >> >
> >> > I would like to ask a question related to the size of Kafka stream. I
> >> > want
> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to
> get
> >> > the
> >> > output from Kafka and then save to Hive by using SparkSQL. The file
> csv
> >> > is
> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
> >> > integer). I see that Spark Streaming first received two
> >> > partitions/batches,
> >> > the first is of 60K messages and the second is of 50K msgs. But from
> the
> >> > third batch, Spark just received 200 messages for each batch (or
> >> > partition).
> >> > I think that this problem is coming from Kafka or some configuration
> in
> >> > Spark. I already tried to configure with the setting
> >> > "auto.offset.reset=largest", but every batch only gets 200 messages.
> >> >
> >> > Could you please tell me how to fix this problem?
> >> > Thank you so much.
> >> >
> >> > Best regards,
> >> > Alex
> >> >
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>


Re: Kafka segmentation

2016-11-16 Thread bo yang
I hit similar issue with Spark Streaming. The batch size seemed a little
random. Sometime it was large with many Kafka messages inside same batch,
sometimes it was very small with just a few messages. Is it possible that
was caused by the backpressure implementation in Spark Streaming?

On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger  wrote:

> Moved to user list.
>
> I'm not really clear on what you're trying to accomplish (why put the
> csv file through Kafka instead of reading it directly with spark?)
>
> auto.offset.reset=largest just means that when starting the job
> without any defined offsets, it will start at the highest (most
> recent) available offsets.  That's probably not what you want if
> you've already loaded csv lines into kafka.
>
> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien 
> wrote:
> > Hi all,
> >
> > I would like to ask a question related to the size of Kafka stream. I
> want
> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to get
> the
> > output from Kafka and then save to Hive by using SparkSQL. The file csv
> is
> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
> > integer). I see that Spark Streaming first received two
> partitions/batches,
> > the first is of 60K messages and the second is of 50K msgs. But from the
> > third batch, Spark just received 200 messages for each batch (or
> partition).
> > I think that this problem is coming from Kafka or some configuration in
> > Spark. I already tried to configure with the setting
> > "auto.offset.reset=largest", but every batch only gets 200 messages.
> >
> > Could you please tell me how to fix this problem?
> > Thank you so much.
> >
> > Best regards,
> > Alex
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


type-safe join in the new DataSet API?

2016-11-10 Thread Yang
the new DataSet API is supposed to provide type safety and type checks at
compile time
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations

It does this indeed for a lot of places, but I found it still doesn't have
a type safe join:

val ds1 = hc.sql("select col1, col2 from mytable")

val ds2 = hc.sql("select col3 , col4 from mytable2")

val ds3 = ds1.joinWith(ds2, ds1.col("col1") === ds2.col("col2"))

here spark has no way to make sure (at compile time) that the two columns
being joined together
, "col1" and "col2" are of matching types. This is contrast to rdd join,
where it would be detected at compile time.

am I missing something?

thanks


Do you use spark 2.0 in work?

2016-10-31 Thread Yang Cao
Hi guys,

Just for personal interest. I wonder whether spark 2.0 a productive version? Is 
there any company use this version as its main version in daily work? THX
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



task not serializable in case of groupByKey() + mapGroups + map?

2016-10-31 Thread Yang
with the following simple code


val a =
sc.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)]
val grouped = a.groupByKey({x:(Int,Int)=>x._1})
val mappedGroups = grouped.mapGroups((k,x)=>{(k,1)})
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>{
  val simpley = yyy.value

  1
})



I'm seeing error:
org.apache.spark.SparkException: Task not serializable
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2053)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.map(RDD.scala:365)
  ... 56 elided
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class:
org.apache.spark.sql.execution.QueryExecution, value: == Parsed Logical
Plan ==
'AppendColumns , unresolveddeserializer(newInstance(class
scala.Tuple2)), [input[0, int, true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Analyzed Logical Plan ==
_1: int, _2: int, value: int
AppendColumns , newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Optimized Logical Plan ==
AppendColumns , newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Physical Plan ==
AppendColumns , newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- Scan ExistingRDD[_1#201,_2#202])
- field (class: org.apache.spark.sql.KeyValueGroupedDataset, name:
queryExecution, type: class org.apache.spark.sql.execution.QueryExecution)
- object (class org.apache.spark.sql.KeyValueGroupedDataset,
org.apache.spark.sql.KeyValueGroupedDataset@71148f10)
- field (class: $iw, name: grouped, type: class
org.apache.spark.sql.KeyValueGroupedDataset)
- object (class $iw, $iw@7b1c13e4)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3e9a0c21)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@218cc682)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2ecedd08)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@79efd402)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@d81976c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2d5d6e2a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@74dc6a7a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e220d85)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1c790a4f)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1d954b06)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1343c904)
- field (class: $line115.$read, name: $iw, type: class $iw)
- object (class $line115.$read, $line115.$read@42497908)
- field (class: $iw, name: $line115$read, type: class
$line115.$read)
- object (class $iw, $iw@af36da5)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@2fd5b99a)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, )
  at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 65 more


Re: RDD groupBy() then random sort each group ?

2016-10-23 Thread Yang
thanks, this direction seems to be inline with what I want.

what i really want is
groupBy() and then for the rows in each group, get an Iterator, and run
each element from the iterator through a local function (specifically SGD),
right now the DataSet API provides this , but it's literally an Iterator so
I can't "reset" the Iterator, but SGD does need the ability to run multiple
passes on the iterator



On Sat, Oct 22, 2016 at 1:22 PM, Koert Kuipers <ko...@tresata.com> wrote:

> groupBy always materializes the entire group (on disk or in memory) which
> is why you should avoid it for large groups.
>
> The key is to never materialize the grouped and shuffled data.
>
> To see one approach to do this take a look at
> https://github.com/tresata/spark-sorted
>
> It's basically a combination of smart partitioning and secondary sort.
>
> On Oct 20, 2016 1:55 PM, "Yang" <tedd...@gmail.com> wrote:
>
>> in my application, I group by same training samples by their model_id's
>>  (the input table contains training samples for 100k different models),
>> then each group ends up having about 1 million training samples,
>>
>> then I feed that group of samples to a little Logistic Regression solver
>> (SGD), but SGD requires the input data to be shuffled randomly (so that
>> positive and negative samples are evenly distributed), so now I do
>> something like
>>
>> my_input_rdd.groupBy(x=>x.model_id).map(x=>
>> val (model_id, group_of_rows) = x
>>
>>  (model_id, group_of_rows.toSeq().shuffle() )
>>
>> ).map(x=> (x._1, train_sgd(x._2))
>>
>>
>> the issue is that on the 3rd row above, I had to explicitly call toSeq()
>> on the group_of_rows in order to shuffle, which is an Iterable and not Seq.
>> now I have to load the entire 1 million rows into memory, and in practice
>> I've seen my tasks OOM and GC time goes crazy (about 50% of total run
>> time). I suspect this toSeq() is the reason, since doing a simple count()
>> on the groupBy() result works fine.
>>
>> I am planning to shuffle the my_input_rdd first, and then groupBy(), and
>> not do the toSeq().shuffle(). intuitively the input rdd is already
>> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
>> group SHOULD remain shuffled  but overall this remains rather flimsy.
>>
>> any ideas to do this more reliably?
>>
>> thanks!
>>
>>


Re: RDD groupBy() then random sort each group ?

2016-10-23 Thread Yang
thanks.

exactly this is what I ended up doing finally. though it seemed to work,
there seems to be guarantee that the randomness after the
sortWithinPartitions() would be preserved after I do a further groupBy.



On Fri, Oct 21, 2016 at 3:55 PM, Cheng Lian <l...@databricks.com> wrote:

> I think it would much easier to use DataFrame API to do this by doing
> local sort using randn() as key. For example, in Spark 2.0:
>
> val df = spark.range(100)
> val shuffled = df.repartition($"id" % 10).sortWithinPartitions(randn(42))
>
> Replace df with a DataFrame wrapping your RDD, and $"id" % 10 with the key
> to group by, then you can get the RDD from shuffled and do the following
> operations you want.
>
> Cheng
>
>
>
> On 10/20/16 10:53 AM, Yang wrote:
>
>> in my application, I group by same training samples by their model_id's
>> (the input table contains training samples for 100k different models), then
>> each group ends up having about 1 million training samples,
>>
>> then I feed that group of samples to a little Logistic Regression solver
>> (SGD), but SGD requires the input data to be shuffled randomly (so that
>> positive and negative samples are evenly distributed), so now I do
>> something like
>>
>> my_input_rdd.groupBy(x=>x.model_id).map(x=>
>> val (model_id, group_of_rows) = x
>>
>>  (model_id, group_of_rows.toSeq().shuffle() )
>>
>> ).map(x=> (x._1, train_sgd(x._2))
>>
>>
>> the issue is that on the 3rd row above, I had to explicitly call toSeq()
>> on the group_of_rows in order to shuffle, which is an Iterable and not Seq.
>> now I have to load the entire 1 million rows into memory, and in practice
>> I've seen my tasks OOM and GC time goes crazy (about 50% of total run
>> time). I suspect this toSeq() is the reason, since doing a simple count()
>> on the groupBy() result works fine.
>>
>> I am planning to shuffle the my_input_rdd first, and then groupBy(), and
>> not do the toSeq().shuffle(). intuitively the input rdd is already
>> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
>> group SHOULD remain shuffled  but overall this remains rather flimsy.
>>
>> any ideas to do this more reliably?
>>
>> thanks!
>>
>>
>


RDD groupBy() then random sort each group ?

2016-10-20 Thread Yang
in my application, I group by same training samples by their model_id's
 (the input table contains training samples for 100k different models),
then each group ends up having about 1 million training samples,

then I feed that group of samples to a little Logistic Regression solver
(SGD), but SGD requires the input data to be shuffled randomly (so that
positive and negative samples are evenly distributed), so now I do
something like

my_input_rdd.groupBy(x=>x.model_id).map(x=>
val (model_id, group_of_rows) = x

 (model_id, group_of_rows.toSeq().shuffle() )

).map(x=> (x._1, train_sgd(x._2))


the issue is that on the 3rd row above, I had to explicitly call toSeq() on
the group_of_rows in order to shuffle, which is an Iterable and not Seq.
now I have to load the entire 1 million rows into memory, and in practice
I've seen my tasks OOM and GC time goes crazy (about 50% of total run
time). I suspect this toSeq() is the reason, since doing a simple count()
on the groupBy() result works fine.

I am planning to shuffle the my_input_rdd first, and then groupBy(), and
not do the toSeq().shuffle(). intuitively the input rdd is already
shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
group SHOULD remain shuffled  but overall this remains rather flimsy.

any ideas to do this more reliably?

thanks!


Re: can mllib Logistic Regression package handle 10 million sparse features?

2016-10-19 Thread Yang
>
>>
>> > This is with 4x worker nodes, 48 cores & 120GB RAM each. I haven't yet
>> tuned
>>
>> > the tree aggregation depth. But the number of partitions can make a
>>
>> > difference - generally fewer is better since the cost is mostly
>>
>> > communication of the gradient (the gradient computation is < 10% of the
>>
>> > per-iteration time).
>>
>> >
>>
>> > Note that the current impl forces dense arrays for intermediate data
>>
>> > structures, increasing the communication cost significantly. See this
>> PR for
>>
>> > info: https://github.com/apache/spark/pull/12761. Once sparse data
>>
>> > structures are supported for this, the linear models will be orders of
>>
>> > magnitude more scalable for sparse data.
>>
>> >
>>
>> >
>>
>> > On Wed, 5 Oct 2016 at 23:37 DB Tsai <dbt...@dbtsai.com> wrote:
>>
>> >>
>>
>> >> With the latest code in the current master, we're successfully
>>
>> >> training LOR using Spark ML's implementation with 14M sparse features.
>>
>> >> You need to tune the depth of aggregation to make it efficient.
>>
>> >>
>>
>> >> Sincerely,
>>
>> >>
>>
>> >> DB Tsai
>>
>> >> --
>>
>> >> Web: https://www.dbtsai.com
>>
>> >> PGP Key ID: 0x9DCC1DBD7FC7BBB2
>>
>> >>
>>
>> >>
>>
>> >> On Wed, Oct 5, 2016 at 12:00 PM, Yang <tedd...@gmail.com> wrote:
>>
>> >> > anybody had actual experience applying it to real problems of this
>>
>> >> > scale?
>>
>> >> >
>>
>> >> > thanks
>>
>> >> >
>>
>> >>
>>
>> >> -
>>
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> >>
>>
>> >
>>
>>


Re: question about the new Dataset API

2016-10-19 Thread Yang
I even added a fake groupByKey on the entire DataSet:


scala> a_ds.groupByKey(k=>1).agg(typed.count[(Long,Long)](_._1)).show
+-++
|value|TypedCount(scala.Tuple2)|
+-++
|1|   2|
+-++




On Tue, Oct 18, 2016 at 11:30 PM, Yang <tedd...@gmail.com> wrote:

> scala> val a = sc.parallelize(Array((1,2),(3,4)))
> a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[243] at
> parallelize at :38
>
> scala> val a_ds = hc.di.createDataFrame(a).as[(Long,Long)]
> a_ds: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: int, _2: int]
>
> scala> a_ds.agg(typed.count[(Long,Long)](x=>x._1))
> res34: org.apache.spark.sql.DataFrame = [TypedCount(org.apache.spark.sql.Row):
> bigint]
>
> scala> res34.show
>
> then it gave me the following error:
>
> Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.
> expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
>
> at $anonfun$1.apply(:46)at 
> org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:69)at
>  
> org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:66)at
>  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at
>  
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at
>  scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)at
>  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at 
> org.apache.spark.scheduler.Task.run(Task.scala:86)at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at
>  java.lang.Thread.run(Thread.java:745)
>
>
> I had to add a groupByKey()
> scala> a_ds.groupByKey(k=>k._1).agg(typed.count[(Long,Long)](_._1)).show
> +-++
> |value|TypedCount(scala.Tuple2)|
> +-++
> |1|   1|
> |3|   1|
> +-++
>
> but why does the groupByKey() make it any different? looks like a bug
>
>
>


question about the new Dataset API

2016-10-19 Thread Yang
scala> val a = sc.parallelize(Array((1,2),(3,4)))
a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[243] at
parallelize at :38

scala> val a_ds = hc.di.createDataFrame(a).as[(Long,Long)]
a_ds: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: int, _2: int]

scala> a_ds.agg(typed.count[(Long,Long)](x=>x._1))
res34: org.apache.spark.sql.DataFrame =
[TypedCount(org.apache.spark.sql.Row): bigint]

scala> res34.show

then it gave me the following error:

Caused by: java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to scala.Tuple2

at $anonfun$1.apply(:46)at
org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:69)at
org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:66)at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at
org.apache.spark.scheduler.Task.run(Task.scala:86)at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at
java.lang.Thread.run(Thread.java:745)


I had to add a groupByKey()
scala> a_ds.groupByKey(k=>k._1).agg(typed.count[(Long,Long)](_._1)).show
+-++
|value|TypedCount(scala.Tuple2)|
+-++
|1|   1|
|3|   1|
+-++

but why does the groupByKey() make it any different? looks like a bug


previous stage results are not saved?

2016-10-17 Thread Yang
I'm trying out 2.0, and ran a long job with 10 stages, in spark-shell

it seems that after all 10 finished successfully, if I run the last, or the
9th again,
spark reruns all the previous stages from scratch, instead of utilizing the
partial results.

this is quite serious since I can't experiment while making small changes
to the code.

any idea what part of the spark framework might have caused this ?

thanks
Yang


question on the structured DataSet API join

2016-10-17 Thread Yang
I'm trying to use the joinWith() method instead of join() since the former
provides type checked result while the latter is a straight DataFrame.


the signature is DataSet[(T,U)] joinWith(other:DataSet[U], col:Column)



here the second arg, col:Column is normally provided by
other.col("col_name"). again once we use a string to specify the column,
you can't do compile time type checks (on the validity of the join
condition, for example you could end up specifying
other.col("a_string_col") === this_ds.col("a_double_col") )

I checked the DataSet API doc, seems there is only this col() method
producing a Column, no other ways.

so is there a type-checked way to provide the join condition?


thanks


RE: LDA and Maximum Iterations

2016-09-20 Thread Yang, Yuhao
Hi Frank,

Which version of Spark are you using? Also can you share more information about 
the exception.

If it’s not confidential, you can send the data sample to me 
(yuhao.y...@intel.com) and I can try to investigate.

Regards,
Yuhao

From: Frank Zhang [mailto:dataminin...@yahoo.com.INVALID]
Sent: Monday, September 19, 2016 9:20 PM
To: user@spark.apache.org
Subject: LDA and Maximum Iterations

Hi all,

   I have a question about parameter setting for LDA model. When I tried to set 
a large number like 500 for
setMaxIterations, the program always fails.  There is a very straightforward 
LDA tutorial using an example data set in the mllib 
package:http://stackoverflow.com/questions/36631991/latent-dirichlet-allocation-lda-algorithm-not-printing-results-in-spark-scala.
  The codes are here:

import org.apache.spark.mllib.clustering.LDA
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("/data/mllib/sample_lda_data.txt") // you might need to 
change the path for the data set
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)

But if I change the last line to
val ldaModel = new LDA().setK(3).setMaxIterations(500).run(corpus), the program 
fails.

I greatly appreciate your help!

Best,

Frank





Re: Issues with Spark On Hbase Connector and versions

2016-08-30 Thread Weiqing Yang
The PR will be reviewed soon.

Thanks,
Weiqing

From: Sachin Jain >
Date: Sunday, August 28, 2016 at 11:12 PM
To: spats >
Cc: user >
Subject: Re: Issues with Spark On Hbase Connector and versions

There is connection leak problem with hortonworks hbase connector if you use 
hbase 1.2.0.
I tried to use hortonwork's connector and felt into the same problem.

Have a look at this Hbase issue HBASE-16017 [0]. The fix for this was 
backported to 1.3.0, 1.4.0 and 2.0.0
I have raised a ticket on their github repo [1] and also generated PR to get 
this fixed. Check this out [2].

But unfortunately no one has responded to it yet so it is not merged. But this 
fix works and I am currently using the same without problems.
So if you want to use this. You can use this one [3] where changes for pull 
request already there.

Hope it helps!!

[0]: https://issues.apache.org/jira/browse/HBASE-16017
[1]: https://github.com/hortonworks-spark/shc/issues/19
[2]: https://github.com/hortonworks-spark/shc/pull/20
[3]: https://github.com/sachinjain024/shc/tree/Issue-19-Connection-Leak

PS: Cross posting my answer from hbase user mailing list because I think it may 
be helpful to other readers.

On Sat, Aug 27, 2016 at 5:17 PM, spats 
> wrote:
Regarding hbase connector by hortonworks
https://github.com/hortonworks-spark/shc, it would be great if someone can
answer these

1. What versions of Hbase & Spark expected? I could not run examples
provided using spark 1.6.0 & hbase 1.2.0
2. I get error when i run example provided  here

, any pointers on what i am doing wrong?

looks like spark not reading hbase-site.xml, but passed it in --files while
spark-shell
e.g --files
/etc/hbase/conf/hbase-site.xml,/etc/hbase/conf/hdfs-site.xml,/etc/hbase/conf/core-site.xml

error
16/08/27 12:35:00 WARN zookeeper.ClientCnxn: Session 0x0 for server null,
unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
at
org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-Spark-On-Hbase-Connector-and-versions-tp27610.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: java.net.UnknownHostException

2016-08-02 Thread Yang Cao
actually, i just came into same problem. Whether you can share some code around 
the error, then I can figure it out whether I can help you. And the 
"s001.bigdata” is your name of name node?
> On 2016年8月2日, at 17:22, pseudo oduesp  wrote:
> 
> someone can help me please 
> 
> 2016-08-01 11:51 GMT+02:00 pseudo oduesp  >:
> hi 
> i get the following erreors when i try using pyspark 2.0 with ipython   on 
> yarn 
> somone can help me please .
> java.lang.IllegalArgumentException: java.net.UnknownHostException: 
> s001.bigdata.;s003.bigdata;s008bigdata.
> at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
> at 
> org.apache.hadoop.crypto.key.kms.KMSClientProvider.getDelegationTokenService(KMSClientProvider.java:823)
> at 
> org.apache.hadoop.crypto.key.kms.KMSClientProvider.addDelegationTokens(KMSClientProvider.java:779)
> at 
> org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension.addDelegationTokens(KeyProviderDelegationTokenExtension.java:86)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2046)
> at 
> org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$obtainTokensForNamenodes$1.apply(YarnSparkHadoopUtil.scala:133)
> at 
> org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$obtainTokensForNamenodes$1.apply(YarnSparkHadoopUtil.scala:130)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
> at 
> org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.obtainTokensForNamenodes(YarnSparkHadoopUtil.scala:130)
> at 
> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:367)
> at 
> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:834)
> at 
> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:167)
> at 
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:149)
> at org.apache.spark.SparkContext.(SparkContext.scala:500)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:240)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:236)
> at 
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
> at 
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
> at py4j.GatewayConnection.run(GatewayConnection.java:211)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> java.net.UnknownHostException:s001.bigdata.;s003.bigdata;s008bigdata.
> 
> 
> thanks 
> 



create external table from partitioned avro file

2016-07-28 Thread Yang Cao
Hi,

I am using spark 1.6 and I hope to create a hive external table based on one 
partitioned avro file. Currently, I don’t find any build-in api to do this 
work. I tried the write.format().saveAsTable, with format 
com.databricks.spark.avro. it returned error can’t file Hive serde for this. 
Also, same problem with function createExternalTable(). Spark seems can 
recognize avro format. Need help for this task. Welcome any suggestion.

THX
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



get hdfs file path in spark

2016-07-25 Thread Yang Cao
Hi,
To be new here, I hope to get assistant from you guys. I wonder whether I have 
some elegant way to get some directory under some path. For example, I have a 
path like on hfs /a/b/c/d/e/f, and I am given a/b/c, is there any straight 
forward way to get the path /a/b/c/d/e . I think I can do it with the help of 
regex. But I still hope to find whether there is easier way that make my code 
cleaner. My evn: spark 1.6, language: Scala


Thx
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



how do I set TBLPROPERTIES in dataFrame.saveAsTable()?

2016-06-15 Thread Yang
I tried df.options(MAP(prop_name->prop_value)).saveAsTable(tb_name)

doesn't seem to work

thanks a lot!


Re: OutOfMemoryError - When saving Word2Vec

2016-06-13 Thread Yuhao Yang
Hi Sharad,

what's your vocabulary size and vector length for Word2Vec?

Regards,
Yuhao

2016-06-13 20:04 GMT+08:00 sharad82 :

> Is this the right forum to post Spark related issues ? I have tried this
> forum along with StackOverflow but not seeing any response.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-When-saving-Word2Vec-tp27142p27151.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


  1   2   3   >