Hi Hequn,
        I want to implement stream join dimension in Flink SQL, I found there 
is a new feature named Temporal Tables delivered by Flink1.7, I think it maybe 
could be used to achieve the join between stream and dimension table. But I am 
not sure about that. Could anyone help me about it? 
        Thanks a lot for your help.

Best 
Henry

> 在 2018年9月26日,上午12:16,Hequn Cheng <chenghe...@gmail.com> 写道:
> 
> Hi vino,
> 
> Thanks for sharing the link. It's a great book and I will take a look. 
> There are kinds of join. Different joins have different semantics. From the 
> link, I think it means the time versioned join.  FLINK-9712 
> <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with 
> Time Versioned Functions and the result is deterministic under eventime.  
> 
> Best, Hequn
> 
> On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com 
> <mailto:yanghua1...@gmail.com>> wrote:
> Hi Hequn,
> 
> The specific content of the book does not give a right or wrong conclusion, 
> but it illustrates this phenomenon: two streams of the same input, playing 
> and joining at the same time, due to the order of events, the connection 
> results are uncertain. This is because the two streams are intertwined in 
> different forms. This has nothing to do with orderby, just that it exists in 
> the stream stream join. Of course, this phenomenon is only a comparison 
> statement with a non-stream join.
> 
> In addition, I recommend this book, which is very famous on Twitter and 
> Amazon. Because you are also Chinese, there is a good translation here. If I 
> guess it is correct, the main translator is also from your company. This part 
> of what I mentioned is here.[1]
> 
> [1]: 
> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>  
> <https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7>
> 
> Thanks, vino.
> 
> Hequn Cheng <chenghe...@gmail.com <mailto:chenghe...@gmail.com>> 
> 于2018年9月25日周二 下午9:45写道:
> Hi vino,
> 
> There are no order problems of stream-stream join in Flink. No matter what 
> order the elements come, stream-stream join in Flink will output results 
> which consistent with standard SQL semantics. I haven't read the book you 
> mentioned. For join, it doesn't guarantee output orders. You have to do 
> orderBy if you want to get ordered results.
> 
> Best, Hequn
> 
> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com 
> <mailto:yanghua1...@gmail.com>> wrote:
> Hi Fabian,
> 
> I may not have stated it here, and there is no semantic problem at the Flink 
> implementation level. Rather, there may be “Time-dependence” here. [1]
> 
> Yes, my initial answer was not to use this form of join in this scenario, but 
> Henry said he converted the table into a stream table and asked about the 
> feasibility of other methods.
> 
> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: 
> Derived Data, Chapter 11: Stream Processing , Stream Joins.
> 
> some content :
> If the ordering of events across streams is undetermined, the join becomes 
> nondeter‐ ministic [87], which means you cannot rerun the same job on the 
> same input and necessarily get the same result: the events on the input 
> streams may be interleaved in a different way when you run the job again. 
> 
> 
> 
> Fabian Hueske <fhue...@gmail.com <mailto:fhue...@gmail.com>> 于2018年9月25日周二 
> 下午8:08写道:
> Hi,
> 
> I don't think that using the current join implementation in the Table API / 
> SQL will work.
> The non-windowed join fully materializes *both* input tables in state. This 
> is necessary, because the join needs to be able to process updates on either 
> side.
> While this is not a problem for the fixed sized MySQL table, materializing 
> the append-only table (aka stream) is probably not what you want.
> You can also not limit idle state retention because it would remove the MySQL 
> table from state at some point.
> 
> The only way to make it work is using a user-defined TableFunction that 
> queries the MySQL table via JDBC. 
> However, please note that these calls would be synchronous, blocking calls.
> 
> @Vino: Why do you think that the stream & stream join is not mature and which 
> problems do you see in the semantics? 
> The semantics are correct (standard SQL semantics) and in my opinion the 
> implementation is also mature.
> However, you should not use the non-windowed join if any of the input tables 
> is ever growing because both sides must be hold in state. This is not an 
> issue of the semantics.
> 
> Cheers,
> Fabian
> 
> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <yanghua1...@gmail.com 
> <mailto:yanghua1...@gmail.com>>:
> Hi Henry,
> 
> 1) I don't recommend this method very much, but you said that you expect to 
> convert mysql table to stream and then to flink table. Under this premise, I 
> said that you can do this by joining two stream tables. But as you know, this 
> join depends on the time period in which the state is saved. To make it 
> equivalent to a dimension table, you must permanently save the state of the 
> stream table that is defined as a "dimension table." I just said that 
> modifying the relevant configuration in Flink can do this, Not for a single 
> table.
> 
> 2) Imagine that there are one million records in two tables. The records in 
> both tables are just beginning to stream into flink, and the records as 
> dimension tables are not fully arrived. Therefore, your matching results may 
> not be as accurate as directly querying Mysql.
> 
> In fact, the current stream & stream join is not very mature, there are some 
> problems in semantics, I personally recommend that you return to stream/batch 
> (mysql) join. For more principle content, I recommend you read a book, 
> referred to as 《DDIA》.
> 
> Thanks, vino.
> 
> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月25日周二 
> 下午5:48写道:
> Hi Vino,
>       I do not quite understand in some sentences below, would you please 
> help explain it a bit more detailedly?
>       1. “such as setting the state retention time of one of the tables to be 
> permanent” , as I know, the state retention time is a global config, I can 
> not set this property per table.
>       2. "you may not be able to match the results, because the data 
> belonging to the mysql table is just beginning to play as a stream”  Why it 
> is not able to match the results?
> 
> Best
> Henry
> 
>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com 
>> <mailto:yanghua1...@gmail.com>> 写道:
>> 
>> Hi Henry,
>> 
>> If you have converted the mysql table to a flink stream table. In flink 
>> table/sql, streams and stream joins can also do this, such as setting the 
>> state retention time of one of the tables to be permanent. But when the job 
>> is just running, you may not be able to match the results, because the data 
>> belonging to the mysql table is just beginning to play as a stream.
>> 
>> Thanks, vino.
>> 
>> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月25日周二 
>> 下午5:10写道:
>> Hi Vino & Hequn,
>>      I am now using the table/sql API, if I import the mysql table as a 
>> stream then convert it into a table, it seems that it can also be a 
>> workaround for batch/streaming joining. May I ask what is the difference 
>> between the UDTF method? Does this implementation has some defects?
>>      
>> Best
>> Henry
>> 
>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com 
>>> <mailto:chenghe...@gmail.com>> 写道:
>>> 
>>> Hi
>>> 
>>> +1 for vino's answer. 
>>> Also, this kind of join will be supported in FLINK-9712 
>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check more 
>>> details in the jira.
>>> 
>>> Best, Hequn
>>> 
>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com 
>>> <mailto:yanghua1...@gmail.com>> wrote:
>>> Hi Henry,
>>> 
>>> There are three ways I can think of:
>>> 
>>> 1) use DataStream API, implement a flatmap UDF to access dimension table;
>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>> 3) customize the table/sql join API/statement's implementation (and change 
>>> the physical plan)
>>> 
>>> Thanks, vino.
>>> 
>>> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月21日周五 
>>> 下午4:43写道:
>>> Hi All,
>>>         Sometimes some “dimension table” need to be joined from the "fact 
>>> table", if data are not joined before sent to Kafka.
>>>         So if the data are joined in Flink, does the “dimension table” have 
>>> to be import as a stream, or there are some other ways can achieve it?
>>>         Thanks a lot!
>>> 
>>> Best
>>> Henry
>> 
> 

Reply via email to