[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api
[ https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396749#comment-16396749 ] Fabian Hueske commented on FLINK-8828: -- IMO, the issue of implicit conversions is unrelated to this issue. I'd be fine to remove it, but that would obviously break programs and should be a separate issue. We can of course adjust the Table API tests to explicitly convert {{Table}} into {{DataSet}} and {{DataStream}}. The actual question is how to handle the conflicting method name. {{DataSet.collect()}} is declared as {{@Public}} and can't be removed or renamed before Flink 2.0. I'm not sure overloading the method is a good idea. Having two methods with the same name and different behavior doesn't sound right. > Add collect method to DataStream / DataSet scala api > > > Key: FLINK-8828 > URL: https://issues.apache.org/jira/browse/FLINK-8828 > Project: Flink > Issue Type: Improvement > Components: Core, DataSet API, DataStream API, Scala API >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > > A collect function is a method that takes a Partial Function as its parameter > and applies it to all the elements in the collection to create a new > collection which satisfies the Partial Function. > It can be found on all [core scala collection > classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html] > as well as on spark's [rdd > interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD] > To understand its utility imagine the following scenario : > Given a DataStream that produces events of type _Purchase_ and _View_ > Transform this stream into a stream of purchase amounts over 1000 euros. > Currently an implementation might look like > {noformat} > val x = dataStream > .filter(_.isInstanceOf[Purchase]) > .map(_.asInstanceOf[Purchase]) > .filter(_.amount > 1000) > .map(_.amount){noformat} > Or alternatively you could do this > {noformat} > dataStream.flatMap(_ match { > case p: Purchase if p.amount > 1000 => Some(p.amount) > case _ => None > }){noformat} > But with collect implemented it could look like > {noformat} > dataStream.collect { > case p: Purchase if p.amount > 1000 => p.amount > }{noformat} > > Which is a lot nicer to both read and write -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api
[ https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389832#comment-16389832 ] Stephan Ewen commented on FLINK-8828: - [~fhueske] and [~twalthr] Please have a look at [~jelmer]'s comment here, I think he has a very good point. Concerning the method name, could we use a method {{collectStream}} or {{collectElements}} or something like that? Alternatively, could we overload the {{collect}} method and rename the current {{collect()}} method eventually? > Add collect method to DataStream / DataSet scala api > > > Key: FLINK-8828 > URL: https://issues.apache.org/jira/browse/FLINK-8828 > Project: Flink > Issue Type: Improvement > Components: Core, DataSet API, DataStream API, Scala API >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > > A collect function is a method that takes a Partial Function as its parameter > and applies it to all the elements in the collection to create a new > collection which satisfies the Partial Function. > It can be found on all [core scala collection > classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html] > as well as on spark's [rdd > interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD] > To understand its utility imagine the following scenario : > Given a DataStream that produces events of type _Purchase_ and _View_ > Transform this stream into a stream of purchase amounts over 1000 euros. > Currently an implementation might look like > {noformat} > val x = dataStream > .filter(_.isInstanceOf[Purchase]) > .map(_.asInstanceOf[Purchase]) > .filter(_.amount > 1000) > .map(_.amount){noformat} > Or alternatively you could do this > {noformat} > dataStream.flatMap(_ match { > case p: Purchase if p.amount > 1000 => Some(p.amount) > case _ => None > }){noformat} > But with collect implemented it could look like > {noformat} > dataStream.collect { > case p: Purchase if p.amount > 1000 => p.amount > }{noformat} > > Which is a lot nicer to both read and write -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api
[ https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384761#comment-16384761 ] Jelmer Kuperus commented on FLINK-8828: --- You are right. Infact the fact that there is already a method defined called collect() seems to be biting me :(. TableSourceITCase is not compiling because the implicit conversions defined in [org.apache.flink.table.api.scala._|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala] now think the no args collect method is ambiguous. And there does not seem to be a good way to make this class compile this without renaming method I introduced to something else. The problem is that scala developers expect this method to be called collect.. As an aside I think using implicit conversions in the way [org.apache.flink.table.api.scala._|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala] does is frowned upon a little in the scala world. A slightly more explicit that will give less unexpected surprises like this one might look something like {noformat} implicit class RichTable[T](table: Table) { def asRowDataSet: DataSet[Row] = { val tableEnv = table.tableEnv.asInstanceOf[ScalaBatchTableEnv] tableEnv.toDataSet[Row](table) } def asRowDataStream: DataStream[Row] = { val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv] tableEnv.toAppendStream[Row](table) } }{noformat} Then this code {noformat} val results = tEnv.scan("T") .select('name, 'rtime, 'val) .collect(){noformat} Could be rewritten as {noformat} val results = tEnv.scan("T") .select('name, 'rtime, 'val) .asRowDataSet .collect(){noformat} > Add collect method to DataStream / DataSet scala api > > > Key: FLINK-8828 > URL: https://issues.apache.org/jira/browse/FLINK-8828 > Project: Flink > Issue Type: Improvement > Components: Core, DataSet API, DataStream API, Scala API >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > > A collect function is a method that takes a Partial Function as its parameter > and applies it to all the elements in the collection to create a new > collection which satisfies the Partial Function. > It can be found on all [core scala collection > classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html] > as well as on spark's [rdd > interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD] > To understand its utility imagine the following scenario : > Given a DataStream that produces events of type _Purchase_ and _View_ > Transform this stream into a stream of purchase amounts over 1000 euros. > Currently an implementation might look like > {noformat} > val x = dataStream > .filter(_.isInstanceOf[Purchase]) > .map(_.asInstanceOf[Purchase]) > .filter(_.amount > 1000) > .map(_.amount){noformat} > Or alternatively you could do this > {noformat} > dataStream.flatMap(_ match { > case p: Purchase if p.amount > 1000 => Some(p.amount) > case _ => None > }){noformat} > But with collect implemented it could look like > {noformat} > dataStream.collect { > case p: Purchase if p.amount > 1000 => p.amount > }{noformat} > > Which is a lot nicer to both read and write -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api
[ https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384046#comment-16384046 ] ASF GitHub Bot commented on FLINK-8828: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5616 need to add to java API as well > Add collect method to DataStream / DataSet scala api > > > Key: FLINK-8828 > URL: https://issues.apache.org/jira/browse/FLINK-8828 > Project: Flink > Issue Type: Improvement > Components: Core, DataSet API, DataStream API, Scala API >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > > A collect function is a method that takes a Partial Function as its parameter > and applies it to all the elements in the collection to create a new > collection which satisfies the Partial Function. > It can be found on all [core scala collection > classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html] > as well as on spark's [rdd > interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD] > To understand its utility imagine the following scenario : > Given a DataStream that produces events of type _Purchase_ and _View_ > Transform this stream into a stream of purchase amounts over 1000 euros. > Currently an implementation might look like > {noformat} > val x = dataStream > .filter(_.isInstanceOf[Purchase]) > .map(_.asInstanceOf[Purchase]) > .filter(_.amount > 1000) > .map(_.amount){noformat} > Or alternatively you could do this > {noformat} > dataStream.flatMap(_ match { > case p: Purchase if p.amount > 1000 => Some(p.amount) > case _ => None > }){noformat} > But with collect implemented it could look like > {noformat} > dataStream.collect { > case p: Purchase if p.amount > 1000 => p.amount > }{noformat} > > Which is a lot nicer to both read and write -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api
[ https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383491#comment-16383491 ] Stephan Ewen commented on FLINK-8828: - Interesting suggestion, and it looks like a pretty lightweight self-contained addition, so that's nice! One thing I would raise is the name "collect". The DataSet API has "collect" as 'pull the data set back to the client', and the streaming api has an experimental feature that does the same for the data stream, also using the name "collect", see {{org.apache.flink.streaming.api.datastream.DataStreamUtils}}. > Add collect method to DataStream / DataSet scala api > > > Key: FLINK-8828 > URL: https://issues.apache.org/jira/browse/FLINK-8828 > Project: Flink > Issue Type: Improvement > Components: Core, DataSet API, DataStream API, Scala API >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > > A collect function is a method that takes a Partial Function as its parameter > and applies it to all the elements in the collection to create a new > collection which satisfies the Partial Function. > It can be found on all [core scala collection > classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html] > as well as on spark's [rdd > interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD] > To understand its utility imagine the following scenario : > Given a DataStream that produces events of type _Purchase_ and _View_ > Transform this stream into a stream of purchase amounts over 1000 euros. > Currently an implementation might look like > {noformat} > val x = dataStream > .filter(_.isInstanceOf[Purchase]) > .map(_.asInstanceOf[Purchase]) > .filter(_.amount > 1000) > .map(_.amount){noformat} > Or alternatively you could do this > {noformat} > dataStream.flatMap(_ match { > case p: Purchase if p.amount > 1000 => Some(p.amount) > case _ => None > }){noformat} > But with collect implemented it could look like > {noformat} > dataStream.collect { > case p: Purchase if p.amount > 1000 => p.amount > }{noformat} > > Which is a lot nicer to both read and write -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api
[ https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383391#comment-16383391 ] ASF GitHub Bot commented on FLINK-8828: --- GitHub user jelmerk opened a pull request: https://github.com/apache/flink/pull/5616 [FLINK-8828] [stream, dataset, scala] Introduce collect method ## What is the purpose of the change A collect function is a method that takes a Partial Function as its parameter and applies it to all the elements in the collection to create a new collection which satisfies the Partial Function. It makes certain things nicer to express ## Brief change log - added collect method on scala dataset and datastream api ## Verifying this change It seems to be hard to find a place where this could be tested in isolation, suggestions welcome ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: don't know - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper:don't know - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/jelmerk/flink collect_support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5616 commit 61b4bcb7c941950d62d0db0aa2041f1796fdbaaf Author: Jelmer KuperusDate: 2018-03-02T09:17:54Z [FLINK-8828] [stream, dataset, scala] Introduce collect method > Add collect method to DataStream / DataSet scala api > > > Key: FLINK-8828 > URL: https://issues.apache.org/jira/browse/FLINK-8828 > Project: Flink > Issue Type: Improvement > Components: Core, DataSet API, DataStream API, Scala API >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > > A collect function is a method that takes a Partial Function as its parameter > and applies it to all the elements in the collection to create a new > collection which satisfies the Partial Function. > It can be found on all [core scala collection > classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html] > as well as on spark's [rdd > interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD] > To understand its utility imagine the following scenario : > Given a DataStream that produces events of type _Purchase_ and _View_ > Transform this stream into a stream of purchase amounts over 1000 euros. > Currently an implementation might look like > {noformat} > val x = dataStream > .filter(_.isInstanceOf[Purchase]) > .map(_.asInstanceOf[Purchase]) > .filter(_.amount > 1000) > .map(_.amount){noformat} > Or alternatively you could do this > {noformat} > dataStream.flatMap(_ match { > case p: Purchase if p.amount > 1000 => Some(p.amount) > case _ => None > }){noformat} > But with collect implemented it could look like > {noformat} > dataStream.collect { > case p: Purchase if p.amount > 1000 => p.amount > }{noformat} > > Which is a lot nicer to both read and write -- This message was sent by Atlassian JIRA (v7.6.3#76005)