[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api

2018-03-13 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396749#comment-16396749
 ] 

Fabian Hueske commented on FLINK-8828:
--

IMO, the issue of implicit conversions is unrelated to this issue.
I'd be fine to remove it, but that would obviously break programs and should be 
a separate issue. 
We can of course adjust the Table API tests to explicitly convert {{Table}} 
into {{DataSet}} and {{DataStream}}.

The actual question is how to handle the conflicting method name. 
{{DataSet.collect()}} is declared as {{@Public}} and can't be removed or 
renamed before Flink 2.0.

I'm not sure overloading the method is a good idea. Having two methods with the 
same name and different behavior doesn't sound right.

> Add collect method to DataStream / DataSet scala api
> 
>
> Key: FLINK-8828
> URL: https://issues.apache.org/jira/browse/FLINK-8828
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataSet API, DataStream API, Scala API
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>
> A collect function is a method that takes a Partial Function as its parameter 
> and applies it to all the elements in the collection to create a new 
> collection which satisfies the Partial Function.
> It can be found on all [core scala collection 
> classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html]
>  as well as on spark's [rdd 
> interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD]
> To understand its utility imagine the following scenario :
> Given a DataStream that produces events of type _Purchase_ and _View_ 
> Transform this stream into a stream of purchase amounts over 1000 euros.
> Currently an implementation might look like
> {noformat}
> val x = dataStream
>   .filter(_.isInstanceOf[Purchase])
>   .map(_.asInstanceOf[Purchase])
>   .filter(_.amount > 1000)
>   .map(_.amount){noformat}
> Or alternatively you could do this
> {noformat}
> dataStream.flatMap(_ match {
>   case p: Purchase if p.amount > 1000 => Some(p.amount)
>   case _ => None
> }){noformat}
> But with collect implemented it could look like
> {noformat}
> dataStream.collect {
>   case p: Purchase if p.amount > 1000 => p.amount
> }{noformat}
>  
> Which is a lot nicer to both read and write



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api

2018-03-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389832#comment-16389832
 ] 

Stephan Ewen commented on FLINK-8828:
-

[~fhueske] and [~twalthr] Please have a look at [~jelmer]'s comment here, I 
think he has a very good point.

Concerning the method name, could we use a method {{collectStream}} or 
{{collectElements}} or something like that?

Alternatively, could we overload the {{collect}} method and rename the current 
{{collect()}} method eventually?

> Add collect method to DataStream / DataSet scala api
> 
>
> Key: FLINK-8828
> URL: https://issues.apache.org/jira/browse/FLINK-8828
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataSet API, DataStream API, Scala API
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>
> A collect function is a method that takes a Partial Function as its parameter 
> and applies it to all the elements in the collection to create a new 
> collection which satisfies the Partial Function.
> It can be found on all [core scala collection 
> classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html]
>  as well as on spark's [rdd 
> interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD]
> To understand its utility imagine the following scenario :
> Given a DataStream that produces events of type _Purchase_ and _View_ 
> Transform this stream into a stream of purchase amounts over 1000 euros.
> Currently an implementation might look like
> {noformat}
> val x = dataStream
>   .filter(_.isInstanceOf[Purchase])
>   .map(_.asInstanceOf[Purchase])
>   .filter(_.amount > 1000)
>   .map(_.amount){noformat}
> Or alternatively you could do this
> {noformat}
> dataStream.flatMap(_ match {
>   case p: Purchase if p.amount > 1000 => Some(p.amount)
>   case _ => None
> }){noformat}
> But with collect implemented it could look like
> {noformat}
> dataStream.collect {
>   case p: Purchase if p.amount > 1000 => p.amount
> }{noformat}
>  
> Which is a lot nicer to both read and write



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api

2018-03-03 Thread Jelmer Kuperus (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384761#comment-16384761
 ] 

Jelmer Kuperus commented on FLINK-8828:
---

You are right. Infact the fact that there is already a method defined called 
collect() seems to be biting me :(. TableSourceITCase is not compiling because 
the implicit conversions defined in 
[org.apache.flink.table.api.scala._|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala]
 now  think the no args collect method is ambiguous.



And there does not seem to be a good way to make this class compile this 
without renaming method I introduced to something else.

The problem is that scala developers expect this method to be called collect..



As an aside I think using implicit conversions in the way 
[org.apache.flink.table.api.scala._|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala]
 does is frowned upon a little in the scala world. A slightly more explicit 
that will give less unexpected surprises like this one might look something like


{noformat}
implicit class RichTable[T](table: Table) {
  def asRowDataSet: DataSet[Row] = {
val tableEnv = table.tableEnv.asInstanceOf[ScalaBatchTableEnv]
tableEnv.toDataSet[Row](table)
  }

  def asRowDataStream: DataStream[Row] = {
val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv]
tableEnv.toAppendStream[Row](table)
  }
}{noformat}
Then this code
{noformat}
val results = tEnv.scan("T")
  .select('name, 'rtime, 'val)
  .collect(){noformat}
Could be rewritten as
{noformat}
val results = tEnv.scan("T")
  .select('name, 'rtime, 'val)
  .asRowDataSet
  .collect(){noformat}
 

 

> Add collect method to DataStream / DataSet scala api
> 
>
> Key: FLINK-8828
> URL: https://issues.apache.org/jira/browse/FLINK-8828
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataSet API, DataStream API, Scala API
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>
> A collect function is a method that takes a Partial Function as its parameter 
> and applies it to all the elements in the collection to create a new 
> collection which satisfies the Partial Function.
> It can be found on all [core scala collection 
> classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html]
>  as well as on spark's [rdd 
> interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD]
> To understand its utility imagine the following scenario :
> Given a DataStream that produces events of type _Purchase_ and _View_ 
> Transform this stream into a stream of purchase amounts over 1000 euros.
> Currently an implementation might look like
> {noformat}
> val x = dataStream
>   .filter(_.isInstanceOf[Purchase])
>   .map(_.asInstanceOf[Purchase])
>   .filter(_.amount > 1000)
>   .map(_.amount){noformat}
> Or alternatively you could do this
> {noformat}
> dataStream.flatMap(_ match {
>   case p: Purchase if p.amount > 1000 => Some(p.amount)
>   case _ => None
> }){noformat}
> But with collect implemented it could look like
> {noformat}
> dataStream.collect {
>   case p: Purchase if p.amount > 1000 => p.amount
> }{noformat}
>  
> Which is a lot nicer to both read and write



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384046#comment-16384046
 ] 

ASF GitHub Bot commented on FLINK-8828:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5616
  
need to add to java API as well


> Add collect method to DataStream / DataSet scala api
> 
>
> Key: FLINK-8828
> URL: https://issues.apache.org/jira/browse/FLINK-8828
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataSet API, DataStream API, Scala API
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>
> A collect function is a method that takes a Partial Function as its parameter 
> and applies it to all the elements in the collection to create a new 
> collection which satisfies the Partial Function.
> It can be found on all [core scala collection 
> classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html]
>  as well as on spark's [rdd 
> interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD]
> To understand its utility imagine the following scenario :
> Given a DataStream that produces events of type _Purchase_ and _View_ 
> Transform this stream into a stream of purchase amounts over 1000 euros.
> Currently an implementation might look like
> {noformat}
> val x = dataStream
>   .filter(_.isInstanceOf[Purchase])
>   .map(_.asInstanceOf[Purchase])
>   .filter(_.amount > 1000)
>   .map(_.amount){noformat}
> Or alternatively you could do this
> {noformat}
> dataStream.flatMap(_ match {
>   case p: Purchase if p.amount > 1000 => Some(p.amount)
>   case _ => None
> }){noformat}
> But with collect implemented it could look like
> {noformat}
> dataStream.collect {
>   case p: Purchase if p.amount > 1000 => p.amount
> }{noformat}
>  
> Which is a lot nicer to both read and write



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api

2018-03-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383491#comment-16383491
 ] 

Stephan Ewen commented on FLINK-8828:
-

Interesting suggestion, and it looks like a pretty lightweight self-contained 
addition, so that's nice!

One thing I would raise is the name "collect". The DataSet API has "collect" as 
'pull the data set back to the client', and the streaming api has an 
experimental feature that does the same for the data stream, also using the 
name "collect", see 
{{org.apache.flink.streaming.api.datastream.DataStreamUtils}}.

> Add collect method to DataStream / DataSet scala api
> 
>
> Key: FLINK-8828
> URL: https://issues.apache.org/jira/browse/FLINK-8828
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataSet API, DataStream API, Scala API
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>
> A collect function is a method that takes a Partial Function as its parameter 
> and applies it to all the elements in the collection to create a new 
> collection which satisfies the Partial Function.
> It can be found on all [core scala collection 
> classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html]
>  as well as on spark's [rdd 
> interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD]
> To understand its utility imagine the following scenario :
> Given a DataStream that produces events of type _Purchase_ and _View_ 
> Transform this stream into a stream of purchase amounts over 1000 euros.
> Currently an implementation might look like
> {noformat}
> val x = dataStream
>   .filter(_.isInstanceOf[Purchase])
>   .map(_.asInstanceOf[Purchase])
>   .filter(_.amount > 1000)
>   .map(_.amount){noformat}
> Or alternatively you could do this
> {noformat}
> dataStream.flatMap(_ match {
>   case p: Purchase if p.amount > 1000 => Some(p.amount)
>   case _ => None
> }){noformat}
> But with collect implemented it could look like
> {noformat}
> dataStream.collect {
>   case p: Purchase if p.amount > 1000 => p.amount
> }{noformat}
>  
> Which is a lot nicer to both read and write



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383391#comment-16383391
 ] 

ASF GitHub Bot commented on FLINK-8828:
---

GitHub user jelmerk opened a pull request:

https://github.com/apache/flink/pull/5616

[FLINK-8828] [stream, dataset, scala] Introduce collect method

## What is the purpose of the change

A collect function is a method that takes a Partial Function as its 
parameter and applies it to all the elements in the collection to create a new 
collection which satisfies the Partial Function. It makes certain things nicer 
to express 

## Brief change log

- added collect method on scala dataset and datastream api

## Verifying this change

It seems to be hard to find a place where this could be tested in 
isolation, suggestions welcome

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: don't know
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper:don't know
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jelmerk/flink collect_support

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5616


commit 61b4bcb7c941950d62d0db0aa2041f1796fdbaaf
Author: Jelmer Kuperus 
Date:   2018-03-02T09:17:54Z

[FLINK-8828] [stream, dataset, scala] Introduce collect method




> Add collect method to DataStream / DataSet scala api
> 
>
> Key: FLINK-8828
> URL: https://issues.apache.org/jira/browse/FLINK-8828
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataSet API, DataStream API, Scala API
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>
> A collect function is a method that takes a Partial Function as its parameter 
> and applies it to all the elements in the collection to create a new 
> collection which satisfies the Partial Function.
> It can be found on all [core scala collection 
> classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html]
>  as well as on spark's [rdd 
> interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD]
> To understand its utility imagine the following scenario :
> Given a DataStream that produces events of type _Purchase_ and _View_ 
> Transform this stream into a stream of purchase amounts over 1000 euros.
> Currently an implementation might look like
> {noformat}
> val x = dataStream
>   .filter(_.isInstanceOf[Purchase])
>   .map(_.asInstanceOf[Purchase])
>   .filter(_.amount > 1000)
>   .map(_.amount){noformat}
> Or alternatively you could do this
> {noformat}
> dataStream.flatMap(_ match {
>   case p: Purchase if p.amount > 1000 => Some(p.amount)
>   case _ => None
> }){noformat}
> But with collect implemented it could look like
> {noformat}
> dataStream.collect {
>   case p: Purchase if p.amount > 1000 => p.amount
> }{noformat}
>  
> Which is a lot nicer to both read and write



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)