[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923887#comment-15923887
 ] 

ASF GitHub Bot commented on FLINK-6039:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3529#discussion_r105862926
  
--- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
@@ -66,10 +66,11 @@ public int getArity() {
 * Gets the field at the specified position.
 * @param pos The position of the field, 0-based.
 * @return The field at the specified position.
-* @throws IndexOutOfBoundsException Thrown, if the position is 
negative, or equal to, or larger than the number of fields.
+* Return null if the position is equal to, or larger than the number 
of fields.
+* @throws IndexOutOfBoundsException Thrown, if the position is 
negative.
 */
public Object getField(int pos) {
-   return fields[pos];
+   return pos >= fields.length ? null : fields[pos];
--- End diff --

As I said before, I think the default should be to be type-safe. If a 
TableFunction would like to relax its type, this should be explicitly indicated 
in my opinion. Either through a dedicated TableFunction or a marker interface.


> Row of TableFunction should support flexible number of fields
> -
>
> Key: FLINK-6039
> URL: https://issues.apache.org/jira/browse/FLINK-6039
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> In actual world, especially while processing logs with TableFunction. The 
> formats of the logs in actual world are flexible. Thus, the number of fields 
> should not be fixed. 
> For examples, we should make the three following types of of TableFunction 
> work.
> {code}
> // Test for incomplete row
> class TableFunc4 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(3)
> row.setField(0, s)  // And we only set values for one column
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for incomplete row
> class TableFunc5 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(1)  // ResultType is three columns, we have only 
> one here
> row.setField(0, s)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for overflow row
> class TableFunc6 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(5)  // ResultType is two columns, we have five 
> columns here
> row.setField(0, s)
> row.setField(1, s.length)
> row.setField(2, s.length)
> row.setField(3, s.length)
> row.setField(4, s.length)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> {code}
> Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
> current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields

2017-03-14 Thread Zhuoluo Yang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923886#comment-15923886
 ] 

Zhuoluo Yang commented on FLINK-6039:
-

Hi [~fhueske] Thank you for your comments. I'm not sure whether this would be a 
default behavior. Maybe a property, an annotation or a subclass could be a new 
design.

> Row of TableFunction should support flexible number of fields
> -
>
> Key: FLINK-6039
> URL: https://issues.apache.org/jira/browse/FLINK-6039
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> In actual world, especially while processing logs with TableFunction. The 
> formats of the logs in actual world are flexible. Thus, the number of fields 
> should not be fixed. 
> For examples, we should make the three following types of of TableFunction 
> work.
> {code}
> // Test for incomplete row
> class TableFunc4 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(3)
> row.setField(0, s)  // And we only set values for one column
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for incomplete row
> class TableFunc5 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(1)  // ResultType is three columns, we have only 
> one here
> row.setField(0, s)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for overflow row
> class TableFunc6 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(5)  // ResultType is two columns, we have five 
> columns here
> row.setField(0, s)
> row.setField(1, s.length)
> row.setField(2, s.length)
> row.setField(3, s.length)
> row.setField(4, s.length)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> {code}
> Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
> current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923882#comment-15923882
 ] 

ASF GitHub Bot commented on FLINK-6039:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3529#discussion_r105861507
  
--- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
@@ -66,10 +66,11 @@ public int getArity() {
 * Gets the field at the specified position.
 * @param pos The position of the field, 0-based.
 * @return The field at the specified position.
-* @throws IndexOutOfBoundsException Thrown, if the position is 
negative, or equal to, or larger than the number of fields.
+* Return null if the position is equal to, or larger than the number 
of fields.
+* @throws IndexOutOfBoundsException Thrown, if the position is 
negative.
 */
public Object getField(int pos) {
-   return fields[pos];
+   return pos >= fields.length ? null : fields[pos];
--- End diff --

Hi @fhueske . Thank you for your comments. The overhead of `if`s are 
additionally caused here. It may not be an efficient way. IMHO, The design of 
`TableFunction` support arbitrary types of output Row. However, too many kinds 
of `TableFunction`s may also confuse users. I will  discover whether there is a 
concise and efficient way.


> Row of TableFunction should support flexible number of fields
> -
>
> Key: FLINK-6039
> URL: https://issues.apache.org/jira/browse/FLINK-6039
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> In actual world, especially while processing logs with TableFunction. The 
> formats of the logs in actual world are flexible. Thus, the number of fields 
> should not be fixed. 
> For examples, we should make the three following types of of TableFunction 
> work.
> {code}
> // Test for incomplete row
> class TableFunc4 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(3)
> row.setField(0, s)  // And we only set values for one column
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for incomplete row
> class TableFunc5 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(1)  // ResultType is three columns, we have only 
> one here
> row.setField(0, s)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for overflow row
> class TableFunc6 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(5)  // ResultType is two columns, we have five 
> columns here
> row.setField(0, s)
> row.setField(1, s.length)
> row.setField(2, s.length)
> row.setField(3, s.length)
> row.setField(4, s.length)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> {code}
> Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
> current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923859#comment-15923859
 ] 

ASF GitHub Bot commented on FLINK-6039:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3529#discussion_r105856276
  
--- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
@@ -66,10 +66,11 @@ public int getArity() {
 * Gets the field at the specified position.
 * @param pos The position of the field, 0-based.
 * @return The field at the specified position.
-* @throws IndexOutOfBoundsException Thrown, if the position is 
negative, or equal to, or larger than the number of fields.
+* Return null if the position is equal to, or larger than the number 
of fields.
+* @throws IndexOutOfBoundsException Thrown, if the position is 
negative.
 */
public Object getField(int pos) {
-   return fields[pos];
+   return pos >= fields.length ? null : fields[pos];
--- End diff --

This will cause overhead for basically every operation and should not be 
done to support a minor feature.
If you expect that you might receive a `Row` which violates the expected 
schema and you want to avoid an `IndexOutOfBoundsException` you should rather 
check `getArity()`.


> Row of TableFunction should support flexible number of fields
> -
>
> Key: FLINK-6039
> URL: https://issues.apache.org/jira/browse/FLINK-6039
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> In actual world, especially while processing logs with TableFunction. The 
> formats of the logs in actual world are flexible. Thus, the number of fields 
> should not be fixed. 
> For examples, we should make the three following types of of TableFunction 
> work.
> {code}
> // Test for incomplete row
> class TableFunc4 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(3)
> row.setField(0, s)  // And we only set values for one column
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for incomplete row
> class TableFunc5 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(1)  // ResultType is three columns, we have only 
> one here
> row.setField(0, s)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for overflow row
> class TableFunc6 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(5)  // ResultType is two columns, we have five 
> columns here
> row.setField(0, s)
> row.setField(1, s.length)
> row.setField(2, s.length)
> row.setField(3, s.length)
> row.setField(4, s.length)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> {code}
> Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
> current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields

2017-03-14 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923836#comment-15923836
 ] 

Fabian Hueske commented on FLINK-6039:
--

Can you explain the semantics if the returned rows are too short or too long?
I'm also not sure if this should be the default behavior or whether we should 
rather add a special type of {{TableFunction}} for this.

> Row of TableFunction should support flexible number of fields
> -
>
> Key: FLINK-6039
> URL: https://issues.apache.org/jira/browse/FLINK-6039
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> In actual world, especially while processing logs with TableFunction. The 
> formats of the logs in actual world are flexible. Thus, the number of fields 
> should not be fixed. 
> For examples, we should make the three following types of of TableFunction 
> work.
> {code}
> // Test for incomplete row
> class TableFunc4 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(3)
> row.setField(0, s)  // And we only set values for one column
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for incomplete row
> class TableFunc5 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(1)  // ResultType is three columns, we have only 
> one here
> row.setField(0, s)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for overflow row
> class TableFunc6 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(5)  // ResultType is two columns, we have five 
> columns here
> row.setField(0, s)
> row.setField(1, s.length)
> row.setField(2, s.length)
> row.setField(3, s.length)
> row.setField(4, s.length)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> {code}
> Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
> current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923831#comment-15923831
 ] 

ASF GitHub Bot commented on FLINK-6039:
---

Github user clarkyzl commented on the issue:

https://github.com/apache/flink/pull/3529
  
I didn't change the behavior of `org.apache.flink.types.Record`. I think we 
can open another issue if necessary.


> Row of TableFunction should support flexible number of fields
> -
>
> Key: FLINK-6039
> URL: https://issues.apache.org/jira/browse/FLINK-6039
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> In actual world, especially while processing logs with TableFunction. The 
> formats of the logs in actual world are flexible. Thus, the number of fields 
> should not be fixed. 
> For examples, we should make the three following types of of TableFunction 
> work.
> {code}
> // Test for incomplete row
> class TableFunc4 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(3)
> row.setField(0, s)  // And we only set values for one column
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for incomplete row
> class TableFunc5 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(1)  // ResultType is three columns, we have only 
> one here
> row.setField(0, s)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for overflow row
> class TableFunc6 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(5)  // ResultType is two columns, we have five 
> columns here
> row.setField(0, s)
> row.setField(1, s.length)
> row.setField(2, s.length)
> row.setField(3, s.length)
> row.setField(4, s.length)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> {code}
> Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
> current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6039) Row of TableFunction should support flexible number of fields

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923820#comment-15923820
 ] 

ASF GitHub Bot commented on FLINK-6039:
---

GitHub user clarkyzl opened a pull request:

https://github.com/apache/flink/pull/3529

[FLINK-6039] [core] Row of TableFunction should support flexible number of 
fields

Type: Improvement
Priority: Major
Components: core, table, udtf
Problem Definition: Row of TableFunction should support flexible number of 
fields. The number of fields should not be fixed. The framework should allow 
the scenario that the field numbers of the `Row`s are different from the 
`TypeInformation` number of the `ResultType`s.
Design:
It would throw a IndexOutOfBoundException when accessing the position 
larger than or equal to the number of fields in a Row, before the patch. It 
will return null after the patch.
Impact Analysis:
The behavior of the Row#getField has changed.
Test:
`mvn clean verify` is done.
The case `TableFunc4` and `TableFunc6` would pass no matter this patch is 
patched or not. They are here to make sure the case works.
The case `TableFunc5` would success after the patch is patched.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/clarkyzl/flink flink-6039

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3529.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3529


commit 9814ec01ed0f8031f055bea63e01d713f56945a2
Author: Zhuoluo Yang 
Date:   2017-03-14T08:44:02Z

[FLINK-6039] [core] Row of TableFunction should support flexible number of 
fields




> Row of TableFunction should support flexible number of fields
> -
>
> Key: FLINK-6039
> URL: https://issues.apache.org/jira/browse/FLINK-6039
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> In actual world, especially while processing logs with TableFunction. The 
> formats of the logs in actual world are flexible. Thus, the number of fields 
> should not be fixed. 
> For examples, we should make the three following types of of TableFunction 
> work.
> {code}
> // Test for incomplete row
> class TableFunc4 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(3)
> row.setField(0, s)  // And we only set values for one column
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for incomplete row
> class TableFunc5 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(1)  // ResultType is three columns, we have only 
> one here
> row.setField(0, s)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for overflow row
> class TableFunc6 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(5)  // ResultType is two columns, we have five 
> columns here
> row.setField(0, s)
> row.setField(1, s.length)
> row.setField(2, s.length)
> row.setField(3, s.length)
> row.setField(4, s.length)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> {code}
> Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
> current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)