radu created FLINK-6073:
---------------------------

             Summary: Support for SQL inner queries for proctime
                 Key: FLINK-6073
                 URL: https://issues.apache.org/jira/browse/FLINK-6073
             Project: Flink
          Issue Type: Sub-task
          Components: Table API & SQL
            Reporter: radu
            Priority: Critical


Time target: Proc Time


**SQL targeted query examples:**
 
Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`

Comments: This is the main functionality targeted by this JIRA to enable to 
combine in the main query results from an inner query.

Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  limit 
1) from s1;`


Comments:
Another equivalent way to write the first example of inner query is with limit 
1. This ensures the equivalency with the SingleElementAggregation used when 
translated the main target syntax for inner query. We must ensure that the 2 
syntaxes are supported and implemented with the same functionality. 
There is the option also to select elements in the inner query from a table not 
just from a different stream. This should be a sub-JIRA issue implement this 
support.


**Description:**
Parsing the SQL inner query via calcite is translated to a join function (left 
join with always true condition) between the output of the query on the main 
stream and the output of a single output aggregation operation on the inner 
query. The translation logic is shown below
```
LogicalJoin [condition=true;type=LEFT]
        LogicalSingleValue[type=aggregation]
                …logic of inner query (LogicalProject, LogicalScan…)
        …logical of main,external query (LogicalProject, LogicalScan…))
```

`LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
case operation rather than a proper join to be implemented between 
stream-to-stream. The implementation behavior should attach to the main stream 
output a value from a different query. 

`LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder of 
the single value that results from the inner query. As this operator is the 
guarantee that the inner query will bring to the join no more than one value, 
there are several options on how to consider it’s functionality in the 
streaming context:
1.      Throw an error if the inner query returns more than one result. This 
would be a typical behavior in the case of standard SQL over DB. However, it is 
very unlikely that a stream would only emit a single value. Therefore, such a 
behavior would be very limited for streams in the inner query. However, such a 
behavior might be more useful and common if the inner query is over a table. 
1.      We can interpret the usage of this parameter as the guarantee that at 
one moment only one value is selected. Therefore the behavior would rather be 
as a filter to select one value. This brings the option that the output of this 
operator evolves in time with the second stream that drives the inner query. 
The decision on when to evolve the stream should depend on what marks the 
evolution of the stream (processing time, watermarks/event time, ingestion 
time, window time partitions…).

 In this JIRA issue the evolution would be marked by the processing time. For 
this implementation the operator would work based on option 2. Hence at every 
moment the state of the operator that holds one value can evolve with the last 
elements. In this way the logic of the inner query is to select always the last 
element (fields, or other query related transformations based on the last 
value). This behavior is needed in many scenarios: (e.g., the typical problem 
of computing the total income, when incomes are in multiple currencies and the 
total needs to be computed in one currency by using always the last exchange 
rate).
This behavior is motivated also by the functionality of the 3rd SQL query 
example – Q3  (using inner query as the input source for FROM ). In such 
scenarios, the selection in the main query would need to be done based on 
latest elements. Therefore with such a behavior the 2 types of queries (Q1 and 
Q3) would provide the same, intuitive result.


**Functionality example**
Based on the logical translation plan, we exemplify next the behavior of the 
inner query applied on 2 streams that operate on processing time.
SELECT amount, (SELECT exchange FROM inputstream1) AS field1 FROM inputstream2

 
<style type="text/css">
</style>
<table class="tg">
  <tr>
    <th class="tg-9hbo">Time</th>
    <th class="tg-9hbo">Stream1</th>
    <th class="tg-9hbo">Stream2</th>
    <th class="tg-9hbo">Output</th>
  </tr>
  <tr>
    <td class="tg-yw4l">T1</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">1.2</td>
    <td class="tg-yw4l"></td>
  </tr>
  <tr>
    <td class="tg-yw4l">T2</td>
    <td class="tg-yw4l">User1,10</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">(10,1.2)</td>
  </tr>
  <tr>
    <td class="tg-yw4l">T3</td>
    <td class="tg-yw4l">User2,11</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">(11,1.2)</td>
  </tr>
  <tr>
    <td class="tg-yw4l">T4</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">1.3</td>
    <td class="tg-yw4l"></td>
  </tr>
  <tr>
    <td class="tg-yw4l">T5</td>
    <td class="tg-yw4l">User3,9</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">(9,1.3)</td>
  </tr>
  <tr>
    <td class="tg-9hbo" colspan="4">...</td>
  </tr>
</table>

Note 1. For streams that would operate on event time, at moment T3 we would 
need to retract the previous outputs ((10, 1.2), (11,1.2) ) and reemit them as 
((10,1.3), (11,1.3) ). 

Note 2. Rather than failing when a new value comes in the inner query we just 
update the state that holds the single value. If option 1 for the behavior of 
LogicalSingleValue is chosen, than an error should be triggered at moment T3.



**Implementation option**
Considering the notes and the option for the behavior the operator would be 
implemented by using the join function of flink  with a custom always true join 
condition and an inner selection for the output based on the incoming direction 
(to mimic the left join). The single value selection can be implemented over a 
statefull flat map. In case the join is executed in parallel by multiple 
operators, than we either use a parallelism of 1 for the statefull flatmap 
(option 1) or we broadcast the outputs of the flatmap to all join instances to 
ensure consistency of the results (option 2). Considering that the flatMap 
functionality of selecting one value is light, option 1 is better.  The design 
schema is shown below.


> [See Picture in the document] 


**General logic of Join**
```
leftDataStream.join(rightDataStream)
                 .where(new ConstantConditionSelector())
                 .equalTo(new ConstantConditionSelector())
                .window(window.create())
                .trigger(new LeftFireTrigger())
                .evictor(new Evictor())
               .apply(JoinFunction());
```




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to