radu created FLINK-6077:
---------------------------

             Summary: Support In/Exists/Except/Any /Some/All for Stream SQL
                 Key: FLINK-6077
                 URL: https://issues.apache.org/jira/browse/FLINK-6077
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: radu


Time target: Proc Time

SQL targeted query examples:
----------------------------

With inner query

Q1. ```SELECT client FROM stream1 WHERE id 
IN 
((SELECT id FROM stream2 GROUP BY FLOOR(proctime TO HOUR), WHERE salary> 4500 
))```

Comment: A concrete example for this query can be to consider selecting the 
customers where their country is the list of countries of suppliers (`Select 
customer FROM customers WHERE Country IN (Select Country FROM suppliers)` )

Comment: This implementation depends on the implementation of the inner
query. The structure can be the same as for inner query support with the
difference that the LogicalJoin between main query and inner query is
conditional.

Comment: The inner query needs a bound as otherwise it cannot be decided
when to trigger.

Comment: If the value is not triggered by the grouping expression then
the inner query must based on when that expression changes value.

Comments: boundaries should be supported over all options: group by
clauses; windows or time expressions (\[floor/ceil\](rowtime/proctime to
hour),)

With collection

Q2. ```SELECT * FROM stream1 WHERE b 
IN 
(5000, 7000, 8000, 9000)```

Comment: This can be checked if it is supported by the DataStreamCalc
implementation. If not it can be transformed as a sub-JIRA task to
extend the DataStreamCalc functionality to implement this conditional
behavior.

Comment: A similar functionality can be provided if the collection is a
table rather than a set of values.

With table

```SELECT client FROM stream1 WHERE id 
IN 
((SELECT id FROM table1 where stream1.id = table1.id))```

Comment: This can be a sub-JIRA issue, perhaps  within the context of dynamic 
tables, to support the join with tables and filtering operations based on 
contents from an external table


General comments: **Except** is similar in behavior with IN or EXISTS as
it filters out outputs of the main stream based on data from a secondary
stream. The implementation will follow exactly the same logic as for
IN/Exists by filtering the outputs in the join function between the main
stream and the secondary stream. Additionally, we apply the same
restrictions for the secondary/inner queries.

```SELECT ID, NAME FROM CUSTOMERS LEFT JOIN ORDERS ON CUSTOMERS.ID = 
ORDERS.CUSTOMER\_ID 
EXCEPT
SELECT ID, NAME FROM CUSTOMERS RIGHT JOIN ORDERS ON CUSTOMERS.ID = 
ORDERS.CUSTOMER\_ID GROUP BY FLOOR(procTime TO HOUR);```


Description:
------------

The IN and EXISTS operators are conditional clauses in WHERE clause to
check for values in certain collections. The collections based on which
the restriction of the values is done can either be static (values,
tables, or parts of a stream). This JIRA issue is concerned with the
latter case of checking for values over a stream. In order for this
operation to work, the stream needs to be bounded such that the result
can trigger and the collection can be formed. This points out to using
some boundaries or groupings over the sub-query that forms the
collection over which IN is applied. This should be supported via 3
options as shown below. Each of these options can be a sub-JIRA issue.


1)  Group By clauses that are applied over some monotonic order of the
    stream based on which ranges are defined.

`   [...] GROUP BY prodId`

3)  Window clauses to define rolling partitions of the data of the
    stream, which evolve in time.

`    [...] WINDOW HOUR AS (RANGE INTERVAL '10' MINUTE TO SECOND(3)   
PRECEDING);`

Functionality example
---------------------

We exemplify below the functionality of the IN/Exists when working with
streams.

```SELECT * FROM stream1 WHERE id IN ((SELECT id2  FROM stream2 GROUP BY 
FLOOR(PROCTIME TO HOUR) WHERE b>10   ))```


Note: The inner query triggers only once an hour. For the next hour the result 
of the previous hour from the inner query will be the one used to filter the 
results from the main query as they come. This is consistent also with how the 
inner queries are translated (see inner queries)


||IngestionTime(Event)||Stream1||Stream 2||Output||
|10:00:01|      Id1,10|         |nil|
|10:02:00|              |Id2,2| |       
|11:25:00|              |Id3,15| |      
|12:3:00|       Id2,15|         |nil|
|12:05:00|      Id3,11|         |Id3,11|
|12:06:00|              |Id2,30| |      
|12:07:00|              |Id3,2|  |      
|12:09:00|      Id2.17|         |nil|
|12:10:00|      Id3,20|         |Id3,20|
|...|



Implementation option
---------------------

Considering that the query only makes sense in the context of 1) window
boundaries and 2) over sub-queries that extract collections of data, the
main design of this is based on inner query implementation with the
following modifications. (As a recap the Inner query is implemented with
a special Join \[left type with always true condition\] between the main
stream and the output of the inner query which is passed through a
single value selection aggregation):

1)  The condition of outputting a result by the LogicalJoin is not
    always true as before. Instead the condition is done within the
    window function by checking that the input from main stream is
    within the collection from the inner query.

2)      The check is done specifically based on the type of function used (IN, 
ANY, SOMEā€¦.). The logic of each such function would need to have a direct 
implementation.

3)  The filter on the inner query to keep a single value is removed and
    instead a collection is passed for evaluation in the join.

4)  The boundaries of the SQL query are to be used as the boundaries to
    define the join window in which the verification is done.

5)  Type of the join behavior is of the INNER JOIN from condition point
    of view (value is emitted only if exists on the other side).

[See attached document for schema]

General logic of Join
---------------------

leftDataStream.join(rightDataStream).where(new
ConstantConditionSelector())

.equalTo(new ConstantConditionSelector())

.window(\[TIME/COUNT\]\[TUMBLE/SLIDE\]window.create())

> //.trigger(new DefaultTrigger())
>
> //.evictor(new DefaultEvictor())

.apply(FlatJoinFunctionWithInSelection());




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to