Hi Fabian. Yes I think that was what I missed. I haven't looked into the
code but just inferring from the translated plan pasted by Henry.

Let me try to take a look and put in a fix for this.

Thanks,
Rong

On Mon, Oct 1, 2018, 7:28 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> I had a look into the code. From what I saw, we are translating the values
> into Rows.
> The problem here is that the IN clause is translated into a join and that
> the join results contains a time attribute field. This is a safety
> restriction to ensure that time attributes do not lose their watermark
> alignment because joins can return their results in random order. This
> should be related to or same as [1].
>
> Anyway, we should not translate IN clauses to joins for incrementally
> evaluated queries (aka. streaming queries).
> The main problem here is that the join materializes both inputs which is
> fine for the VALUES input but not for the "stream".
> I created FLINK-10474 to fix the problem.
>
> A workaround for the problem could be a user-defined scalar function that
> replaces the IN clause.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-10211
> [2] https://issues.apache.org/jira/browse/FLINK-10474
>
> Am Mo., 1. Okt. 2018 um 10:01 Uhr schrieb Timo Walther <twal...@apache.org
> >:
>
>> Hi,
>>
>> tuple should not be used anywhere in flink-table. @Rong can you point us
>> to the corresponding code? I haven't looked into the code but we should
>> definitely support this query. @Henry feel free to open an issue for it.
>>
>> Regards,
>> Timo
>>
>>
>> Am 28.09.18 um 19:14 schrieb Rong Rong:
>>
>> Yes.
>>
>> Thanks for bringing this up Hequn! :-) I think Tuple would not be the
>> best container to use.
>>
>> However, in search for alternative, shouldn't Collection / List be a more
>> suitable solution? Row seems to not fit in the context (as there can be
>> Rows with elements of different type).
>> I vaguely recall there was similar JIRA but might not be related to IN
>> clause. Let me try to dig it up.
>>
>> --
>> Rong
>>
>> On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I haven't look into the code. If this is limited by Tuple, would it
>>> better to implement it with Row?
>>>
>>> Best, Hequn
>>>
>>> On Fri, Sep 28, 2018 at 9:27 PM Rong Rong <walter...@gmail.com> wrote:
>>>
>>>> Hi Henry, Vino.
>>>>
>>>> I think IN operator was translated into either a RexSubQuery or a
>>>> SqlStdOperatorTable.IN operator.
>>>> I think Vino was referring to the first case.
>>>> For the second case (I think that's what you are facing here), they are
>>>> converted into tuples and the maximum we currently have in Flink was
>>>> Tuple25.java, I was wondering if that was the issue you are facing.
>>>> You can probably split the IN into many IN combining with OR.
>>>>
>>>> --
>>>> Rong
>>>>
>>>> On Fri, Sep 28, 2018 at 2:33 AM vino yang <yanghua1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Henry,
>>>>>
>>>>> Maybe the number of elements in your IN clause is out of range? Its
>>>>> default value is 20, you can modify it with this configuration item:
>>>>>
>>>>> *withInSubQueryThreshold(XXX)*
>>>>>
>>>>> This API comes from Calcite.
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月28日周五 下午4:23写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>     When I am executing the following SQL in flink 1.6.1, some error 
>>>>>> throws out saying that it has a support issue, but when I reduce the 
>>>>>> number of integers in the “in” sentence, for example,
>>>>>>
>>>>>>     trackId in (124427150,71648998) , Flink does not complain anything, 
>>>>>> so I wonder is there any length limit in “in” operation?
>>>>>>
>>>>>> Thanks a lot.
>>>>>>
>>>>>> SELECT
>>>>>>     trackId as id,track_title as description, count(*) as cnt
>>>>>> FROM
>>>>>>     play
>>>>>> WHERE
>>>>>>     appName='play.statistics.trace' and
>>>>>>     trackId in 
>>>>>> (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
>>>>>> GROUP BY
>>>>>>     HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' 
>>>>>> MINUTE),trackId,track_title;
>>>>>>
>>>>>>
>>>>>>
>>>>>> FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
>>>>>>   FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2],
>>>>>> trackId=[$t0], track_title=[$t1])
>>>>>>     FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
>>>>>>       FlinkLogicalCalc(expr#0..4=[{inputs}],
>>>>>> expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)],
>>>>>> trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
>>>>>>         FlinkLogicalNativeTableScan(table=[[play]])
>>>>>>       FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, {
>>>>>> 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, {
>>>>>> 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, {
>>>>>> 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, {
>>>>>> 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, {
>>>>>> 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, {
>>>>>> 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, {
>>>>>> 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, {
>>>>>> 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, {
>>>>>> 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, {
>>>>>> 20506257 }, { 122572115 }, { 92083574 }]])
>>>>>>
>>>>>> This exception indicates that the query uses an unsupported SQL
>>>>>> feature.
>>>>>> Please check the documentation for the set of currently supported SQL
>>>>>> features.
>>>>>> at
>>>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
>>>>>> at
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
>>>>>> at
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
>>>>>> at
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>>>>>> at
>>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
>>>>>> at
>>>>>> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
>>>>>> at
>>>>>> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
>>>>>> at
>>>>>> com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
>>>>>> at
>>>>>> com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
>>>>>> at
>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>>>> at
>>>>>> com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
>>>>>> at
>>>>>> com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
>>>>>> at
>>>>>> com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
>>>>>> at
>>>>>> com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)
>>>>>>
>>>>>> Best
>>>>>> Henry
>>>>>>
>>>>>
>>

Reply via email to