[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields
[ https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923887#comment-15923887 ] ASF GitHub Bot commented on FLINK-6039: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3529#discussion_r105862926 --- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java --- @@ -66,10 +66,11 @@ public int getArity() { * Gets the field at the specified position. * @param pos The position of the field, 0-based. * @return The field at the specified position. -* @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. +* Return null if the position is equal to, or larger than the number of fields. +* @throws IndexOutOfBoundsException Thrown, if the position is negative. */ public Object getField(int pos) { - return fields[pos]; + return pos >= fields.length ? null : fields[pos]; --- End diff -- As I said before, I think the default should be to be type-safe. If a TableFunction would like to relax its type, this should be explicitly indicated in my opinion. Either through a dedicated TableFunction or a marker interface. > Row of TableFunction should support flexible number of fields > - > > Key: FLINK-6039 > URL: https://issues.apache.org/jira/browse/FLINK-6039 > Project: Flink > Issue Type: Improvement >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In actual world, especially while processing logs with TableFunction. The > formats of the logs in actual world are flexible. Thus, the number of fields > should not be fixed. > For examples, we should make the three following types of of TableFunction > work. > {code} > // Test for incomplete row > class TableFunc4 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(3) > row.setField(0, s) // And we only set values for one column > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for incomplete row > class TableFunc5 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(1) // ResultType is three columns, we have only > one here > row.setField(0, s) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for overflow row > class TableFunc6 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(5) // ResultType is two columns, we have five > columns here > row.setField(0, s) > row.setField(1, s.length) > row.setField(2, s.length) > row.setField(3, s.length) > row.setField(4, s.length) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > {code} > Actually, the TableFunc4 and TableFunc6 has already worked correctly with > current version. This issue will make TableFunc5 works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields
[ https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923886#comment-15923886 ] Zhuoluo Yang commented on FLINK-6039: - Hi [~fhueske] Thank you for your comments. I'm not sure whether this would be a default behavior. Maybe a property, an annotation or a subclass could be a new design. > Row of TableFunction should support flexible number of fields > - > > Key: FLINK-6039 > URL: https://issues.apache.org/jira/browse/FLINK-6039 > Project: Flink > Issue Type: Improvement >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In actual world, especially while processing logs with TableFunction. The > formats of the logs in actual world are flexible. Thus, the number of fields > should not be fixed. > For examples, we should make the three following types of of TableFunction > work. > {code} > // Test for incomplete row > class TableFunc4 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(3) > row.setField(0, s) // And we only set values for one column > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for incomplete row > class TableFunc5 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(1) // ResultType is three columns, we have only > one here > row.setField(0, s) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for overflow row > class TableFunc6 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(5) // ResultType is two columns, we have five > columns here > row.setField(0, s) > row.setField(1, s.length) > row.setField(2, s.length) > row.setField(3, s.length) > row.setField(4, s.length) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > {code} > Actually, the TableFunc4 and TableFunc6 has already worked correctly with > current version. This issue will make TableFunc5 works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields
[ https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923882#comment-15923882 ] ASF GitHub Bot commented on FLINK-6039: --- Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3529#discussion_r105861507 --- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java --- @@ -66,10 +66,11 @@ public int getArity() { * Gets the field at the specified position. * @param pos The position of the field, 0-based. * @return The field at the specified position. -* @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. +* Return null if the position is equal to, or larger than the number of fields. +* @throws IndexOutOfBoundsException Thrown, if the position is negative. */ public Object getField(int pos) { - return fields[pos]; + return pos >= fields.length ? null : fields[pos]; --- End diff -- Hi @fhueske . Thank you for your comments. The overhead of `if`s are additionally caused here. It may not be an efficient way. IMHO, The design of `TableFunction` support arbitrary types of output Row. However, too many kinds of `TableFunction`s may also confuse users. I will discover whether there is a concise and efficient way. > Row of TableFunction should support flexible number of fields > - > > Key: FLINK-6039 > URL: https://issues.apache.org/jira/browse/FLINK-6039 > Project: Flink > Issue Type: Improvement >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In actual world, especially while processing logs with TableFunction. The > formats of the logs in actual world are flexible. Thus, the number of fields > should not be fixed. > For examples, we should make the three following types of of TableFunction > work. > {code} > // Test for incomplete row > class TableFunc4 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(3) > row.setField(0, s) // And we only set values for one column > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for incomplete row > class TableFunc5 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(1) // ResultType is three columns, we have only > one here > row.setField(0, s) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for overflow row > class TableFunc6 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(5) // ResultType is two columns, we have five > columns here > row.setField(0, s) > row.setField(1, s.length) > row.setField(2, s.length) > row.setField(3, s.length) > row.setField(4, s.length) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > {code} > Actually, the TableFunc4 and TableFunc6 has already worked correctly with > current version. This issue will make TableFunc5 works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields
[ https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923859#comment-15923859 ] ASF GitHub Bot commented on FLINK-6039: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3529#discussion_r105856276 --- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java --- @@ -66,10 +66,11 @@ public int getArity() { * Gets the field at the specified position. * @param pos The position of the field, 0-based. * @return The field at the specified position. -* @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. +* Return null if the position is equal to, or larger than the number of fields. +* @throws IndexOutOfBoundsException Thrown, if the position is negative. */ public Object getField(int pos) { - return fields[pos]; + return pos >= fields.length ? null : fields[pos]; --- End diff -- This will cause overhead for basically every operation and should not be done to support a minor feature. If you expect that you might receive a `Row` which violates the expected schema and you want to avoid an `IndexOutOfBoundsException` you should rather check `getArity()`. > Row of TableFunction should support flexible number of fields > - > > Key: FLINK-6039 > URL: https://issues.apache.org/jira/browse/FLINK-6039 > Project: Flink > Issue Type: Improvement >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In actual world, especially while processing logs with TableFunction. The > formats of the logs in actual world are flexible. Thus, the number of fields > should not be fixed. > For examples, we should make the three following types of of TableFunction > work. > {code} > // Test for incomplete row > class TableFunc4 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(3) > row.setField(0, s) // And we only set values for one column > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for incomplete row > class TableFunc5 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(1) // ResultType is three columns, we have only > one here > row.setField(0, s) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for overflow row > class TableFunc6 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(5) // ResultType is two columns, we have five > columns here > row.setField(0, s) > row.setField(1, s.length) > row.setField(2, s.length) > row.setField(3, s.length) > row.setField(4, s.length) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > {code} > Actually, the TableFunc4 and TableFunc6 has already worked correctly with > current version. This issue will make TableFunc5 works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields
[ https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923836#comment-15923836 ] Fabian Hueske commented on FLINK-6039: -- Can you explain the semantics if the returned rows are too short or too long? I'm also not sure if this should be the default behavior or whether we should rather add a special type of {{TableFunction}} for this. > Row of TableFunction should support flexible number of fields > - > > Key: FLINK-6039 > URL: https://issues.apache.org/jira/browse/FLINK-6039 > Project: Flink > Issue Type: Improvement >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In actual world, especially while processing logs with TableFunction. The > formats of the logs in actual world are flexible. Thus, the number of fields > should not be fixed. > For examples, we should make the three following types of of TableFunction > work. > {code} > // Test for incomplete row > class TableFunc4 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(3) > row.setField(0, s) // And we only set values for one column > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for incomplete row > class TableFunc5 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(1) // ResultType is three columns, we have only > one here > row.setField(0, s) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for overflow row > class TableFunc6 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(5) // ResultType is two columns, we have five > columns here > row.setField(0, s) > row.setField(1, s.length) > row.setField(2, s.length) > row.setField(3, s.length) > row.setField(4, s.length) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > {code} > Actually, the TableFunc4 and TableFunc6 has already worked correctly with > current version. This issue will make TableFunc5 works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields
[ https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923831#comment-15923831 ] ASF GitHub Bot commented on FLINK-6039: --- Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3529 I didn't change the behavior of `org.apache.flink.types.Record`. I think we can open another issue if necessary. > Row of TableFunction should support flexible number of fields > - > > Key: FLINK-6039 > URL: https://issues.apache.org/jira/browse/FLINK-6039 > Project: Flink > Issue Type: Improvement >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In actual world, especially while processing logs with TableFunction. The > formats of the logs in actual world are flexible. Thus, the number of fields > should not be fixed. > For examples, we should make the three following types of of TableFunction > work. > {code} > // Test for incomplete row > class TableFunc4 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(3) > row.setField(0, s) // And we only set values for one column > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for incomplete row > class TableFunc5 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(1) // ResultType is three columns, we have only > one here > row.setField(0, s) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for overflow row > class TableFunc6 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(5) // ResultType is two columns, we have five > columns here > row.setField(0, s) > row.setField(1, s.length) > row.setField(2, s.length) > row.setField(3, s.length) > row.setField(4, s.length) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > {code} > Actually, the TableFunc4 and TableFunc6 has already worked correctly with > current version. This issue will make TableFunc5 works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields
[ https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923820#comment-15923820 ] ASF GitHub Bot commented on FLINK-6039: --- GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3529 [FLINK-6039] [core] Row of TableFunction should support flexible number of fields Type: Improvement Priority: Major Components: core, table, udtf Problem Definition: Row of TableFunction should support flexible number of fields. The number of fields should not be fixed. The framework should allow the scenario that the field numbers of the `Row`s are different from the `TypeInformation` number of the `ResultType`s. Design: It would throw a IndexOutOfBoundException when accessing the position larger than or equal to the number of fields in a Row, before the patch. It will return null after the patch. Impact Analysis: The behavior of the Row#getField has changed. Test: `mvn clean verify` is done. The case `TableFunc4` and `TableFunc6` would pass no matter this patch is patched or not. They are here to make sure the case works. The case `TableFunc5` would success after the patch is patched. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-6039 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3529.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3529 commit 9814ec01ed0f8031f055bea63e01d713f56945a2 Author: Zhuoluo YangDate: 2017-03-14T08:44:02Z [FLINK-6039] [core] Row of TableFunction should support flexible number of fields > Row of TableFunction should support flexible number of fields > - > > Key: FLINK-6039 > URL: https://issues.apache.org/jira/browse/FLINK-6039 > Project: Flink > Issue Type: Improvement >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In actual world, especially while processing logs with TableFunction. The > formats of the logs in actual world are flexible. Thus, the number of fields > should not be fixed. > For examples, we should make the three following types of of TableFunction > work. > {code} > // Test for incomplete row > class TableFunc4 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(3) > row.setField(0, s) // And we only set values for one column > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for incomplete row > class TableFunc5 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(1) // ResultType is three columns, we have only > one here > row.setField(0, s) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for overflow row > class TableFunc6 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(5) // ResultType is two columns, we have five > columns here > row.setField(0, s) > row.setField(1, s.length) > row.setField(2, s.length) > row.setField(3, s.length) > row.setField(4, s.length) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > {code} > Actually, the TableFunc4 and TableFunc6 has already worked correctly with > current version. This issue will make TableFunc5 works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)