[jira] [Commented] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo
[ https://issues.apache.org/jira/browse/FLINK-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470264#comment-16470264 ] ASF GitHub Bot commented on FLINK-8649: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5479 > Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo > > > Key: FLINK-8649 > URL: https://issues.apache.org/jira/browse/FLINK-8649 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 1.4.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Trivial > Fix For: 1.5.0 > > > This is {{StreamExecutionEnvironment.createInput}} in the Scala API: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat)) > {code} > It should pass on the implicitly got {{TypeInformation}} to Java like this: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat, > implicitly[TypeInformation[T]])) > {code} > The current situation creates a problem, for example, when we have generics > in the type like in the following code, where the Java API can't deduce the > {{TypeInformation}} on its own: > {code} > > StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer, > Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo
[ https://issues.apache.org/jira/browse/FLINK-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407874#comment-16407874 ] ASF GitHub Bot commented on FLINK-8649: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5479 merging. > Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo > > > Key: FLINK-8649 > URL: https://issues.apache.org/jira/browse/FLINK-8649 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 1.4.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Trivial > Fix For: 1.5.0 > > > This is {{StreamExecutionEnvironment.createInput}} in the Scala API: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat)) > {code} > It should pass on the implicitly got {{TypeInformation}} to Java like this: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat, > implicitly[TypeInformation[T]])) > {code} > The current situation creates a problem, for example, when we have generics > in the type like in the following code, where the Java API can't deduce the > {{TypeInformation}} on its own: > {code} > > StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer, > Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo
[ https://issues.apache.org/jira/browse/FLINK-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396856#comment-16396856 ] ASF GitHub Bot commented on FLINK-8649: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/5479 Sure, I've added a test. > Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo > > > Key: FLINK-8649 > URL: https://issues.apache.org/jira/browse/FLINK-8649 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 1.4.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Trivial > Fix For: 1.5.0 > > > This is {{StreamExecutionEnvironment.createInput}} in the Scala API: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat)) > {code} > It should pass on the implicitly got {{TypeInformation}} to Java like this: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat, > implicitly[TypeInformation[T]])) > {code} > The current situation creates a problem, for example, when we have generics > in the type like in the following code, where the Java API can't deduce the > {{TypeInformation}} on its own: > {code} > > StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer, > Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo
[ https://issues.apache.org/jira/browse/FLINK-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392854#comment-16392854 ] ASF GitHub Bot commented on FLINK-8649: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5479 Looks good. Can you add a test for a type where Java type extraction fails, but passing the type info directly from Scala works? > Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo > > > Key: FLINK-8649 > URL: https://issues.apache.org/jira/browse/FLINK-8649 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 1.4.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Trivial > Fix For: 1.5.0 > > > This is {{StreamExecutionEnvironment.createInput}} in the Scala API: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat)) > {code} > It should pass on the implicitly got {{TypeInformation}} to Java like this: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat, > implicitly[TypeInformation[T]])) > {code} > The current situation creates a problem, for example, when we have generics > in the type like in the following code, where the Java API can't deduce the > {{TypeInformation}} on its own: > {code} > > StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer, > Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo
[ https://issues.apache.org/jira/browse/FLINK-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363644#comment-16363644 ] ASF GitHub Bot commented on FLINK-8649: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5479 Thank you @ggevay. I will look at this soon. > Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo > > > Key: FLINK-8649 > URL: https://issues.apache.org/jira/browse/FLINK-8649 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 1.4.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Trivial > Fix For: 1.5.0 > > > This is {{StreamExecutionEnvironment.createInput}} in the Scala API: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat)) > {code} > It should pass on the implicitly got {{TypeInformation}} to Java like this: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat, > implicitly[TypeInformation[T]])) > {code} > The current situation creates a problem, for example, when we have generics > in the type like in the following code, where the Java API can't deduce the > {{TypeInformation}} on its own: > {code} > > StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer, > Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo
[ https://issues.apache.org/jira/browse/FLINK-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362771#comment-16362771 ] ASF GitHub Bot commented on FLINK-8649: --- GitHub user ggevay opened a pull request: https://github.com/apache/flink/pull/5479 [FLINK-8649] [scala api] Pass on TypeInfo in StreamExecutionEnvironment.createInput ## What is the purpose of the change In the Scala API, modify `StreamExecutionEnvironment.createInput` to pass through the implicitly got TypeInfo to the java `createInput` call. With the old code, the Java API tries to figure out the TypeInfo on its own, which doesn't always work as well as the TypeInfo creation of the Scala API. Note that if the input format implements `ResultTypeQueryable`, then we would like to retain the old behaviour, so we just do the call into the Java API in the old way in this case. (`ResultTypeQueryable` can sometimes give a better type than the Scala TypeInfo creation, e.g. when the result type depends on parametrization of the input format.) ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/ggevay/flink createInput-typeInfo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5479 commit d03b67feed45c07f8b886eca1042781d658b1af0 Author: Gabor Gevay Date: 2018-02-13T17:09:05Z [FLINK-8649] [scala api] Pass on TypeInfo in StreamExecutionEnvironment.createInput > Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo > > > Key: FLINK-8649 > URL: https://issues.apache.org/jira/browse/FLINK-8649 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 1.4.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Trivial > Fix For: 1.5.0 > > > This is {{StreamExecutionEnvironment.createInput}} in the Scala API: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat)) > {code} > It should pass on the implicitly got {{TypeInformation}} to Java like this: > {code} > def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): > DataStream[T] = > asScalaStream(javaEnv.createInput(inputFormat, > implicitly[TypeInformation[T]])) > {code} > The current situation creates a problem, for example, when we have generics > in the type like in the following code, where the Java API can't deduce the > {{TypeInformation}} on its own: > {code} > > StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer, > Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)