Hi Hequn, I want to implement stream join dimension in Flink SQL, I found there is a new feature named Temporal Tables delivered by Flink1.7, I think it maybe could be used to achieve the join between stream and dimension table. But I am not sure about that. Could anyone help me about it? Thanks a lot for your help.
Best Henry > 在 2018年9月26日,上午12:16,Hequn Cheng <chenghe...@gmail.com> 写道: > > Hi vino, > > Thanks for sharing the link. It's a great book and I will take a look. > There are kinds of join. Different joins have different semantics. From the > link, I think it means the time versioned join. FLINK-9712 > <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with > Time Versioned Functions and the result is deterministic under eventime. > > Best, Hequn > > On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com > <mailto:yanghua1...@gmail.com>> wrote: > Hi Hequn, > > The specific content of the book does not give a right or wrong conclusion, > but it illustrates this phenomenon: two streams of the same input, playing > and joining at the same time, due to the order of events, the connection > results are uncertain. This is because the two streams are intertwined in > different forms. This has nothing to do with orderby, just that it exists in > the stream stream join. Of course, this phenomenon is only a comparison > statement with a non-stream join. > > In addition, I recommend this book, which is very famous on Twitter and > Amazon. Because you are also Chinese, there is a good translation here. If I > guess it is correct, the main translator is also from your company. This part > of what I mentioned is here.[1] > > [1]: > https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7 > > <https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7> > > Thanks, vino. > > Hequn Cheng <chenghe...@gmail.com <mailto:chenghe...@gmail.com>> > 于2018年9月25日周二 下午9:45写道: > Hi vino, > > There are no order problems of stream-stream join in Flink. No matter what > order the elements come, stream-stream join in Flink will output results > which consistent with standard SQL semantics. I haven't read the book you > mentioned. For join, it doesn't guarantee output orders. You have to do > orderBy if you want to get ordered results. > > Best, Hequn > > On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com > <mailto:yanghua1...@gmail.com>> wrote: > Hi Fabian, > > I may not have stated it here, and there is no semantic problem at the Flink > implementation level. Rather, there may be “Time-dependence” here. [1] > > Yes, my initial answer was not to use this form of join in this scenario, but > Henry said he converted the table into a stream table and asked about the > feasibility of other methods. > > [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: > Derived Data, Chapter 11: Stream Processing , Stream Joins. > > some content : > If the ordering of events across streams is undetermined, the join becomes > nondeter‐ ministic [87], which means you cannot rerun the same job on the > same input and necessarily get the same result: the events on the input > streams may be interleaved in a different way when you run the job again. > > > > Fabian Hueske <fhue...@gmail.com <mailto:fhue...@gmail.com>> 于2018年9月25日周二 > 下午8:08写道: > Hi, > > I don't think that using the current join implementation in the Table API / > SQL will work. > The non-windowed join fully materializes *both* input tables in state. This > is necessary, because the join needs to be able to process updates on either > side. > While this is not a problem for the fixed sized MySQL table, materializing > the append-only table (aka stream) is probably not what you want. > You can also not limit idle state retention because it would remove the MySQL > table from state at some point. > > The only way to make it work is using a user-defined TableFunction that > queries the MySQL table via JDBC. > However, please note that these calls would be synchronous, blocking calls. > > @Vino: Why do you think that the stream & stream join is not mature and which > problems do you see in the semantics? > The semantics are correct (standard SQL semantics) and in my opinion the > implementation is also mature. > However, you should not use the non-windowed join if any of the input tables > is ever growing because both sides must be hold in state. This is not an > issue of the semantics. > > Cheers, > Fabian > > Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <yanghua1...@gmail.com > <mailto:yanghua1...@gmail.com>>: > Hi Henry, > > 1) I don't recommend this method very much, but you said that you expect to > convert mysql table to stream and then to flink table. Under this premise, I > said that you can do this by joining two stream tables. But as you know, this > join depends on the time period in which the state is saved. To make it > equivalent to a dimension table, you must permanently save the state of the > stream table that is defined as a "dimension table." I just said that > modifying the relevant configuration in Flink can do this, Not for a single > table. > > 2) Imagine that there are one million records in two tables. The records in > both tables are just beginning to stream into flink, and the records as > dimension tables are not fully arrived. Therefore, your matching results may > not be as accurate as directly querying Mysql. > > In fact, the current stream & stream join is not very mature, there are some > problems in semantics, I personally recommend that you return to stream/batch > (mysql) join. For more principle content, I recommend you read a book, > referred to as 《DDIA》. > > Thanks, vino. > > 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月25日周二 > 下午5:48写道: > Hi Vino, > I do not quite understand in some sentences below, would you please > help explain it a bit more detailedly? > 1. “such as setting the state retention time of one of the tables to be > permanent” , as I know, the state retention time is a global config, I can > not set this property per table. > 2. "you may not be able to match the results, because the data > belonging to the mysql table is just beginning to play as a stream” Why it > is not able to match the results? > > Best > Henry > >> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com >> <mailto:yanghua1...@gmail.com>> 写道: >> >> Hi Henry, >> >> If you have converted the mysql table to a flink stream table. In flink >> table/sql, streams and stream joins can also do this, such as setting the >> state retention time of one of the tables to be permanent. But when the job >> is just running, you may not be able to match the results, because the data >> belonging to the mysql table is just beginning to play as a stream. >> >> Thanks, vino. >> >> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月25日周二 >> 下午5:10写道: >> Hi Vino & Hequn, >> I am now using the table/sql API, if I import the mysql table as a >> stream then convert it into a table, it seems that it can also be a >> workaround for batch/streaming joining. May I ask what is the difference >> between the UDTF method? Does this implementation has some defects? >> >> Best >> Henry >> >>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com >>> <mailto:chenghe...@gmail.com>> 写道: >>> >>> Hi >>> >>> +1 for vino's answer. >>> Also, this kind of join will be supported in FLINK-9712 >>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check more >>> details in the jira. >>> >>> Best, Hequn >>> >>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com >>> <mailto:yanghua1...@gmail.com>> wrote: >>> Hi Henry, >>> >>> There are three ways I can think of: >>> >>> 1) use DataStream API, implement a flatmap UDF to access dimension table; >>> 2) use table/sql API, implement a UDTF to access dimension table; >>> 3) customize the table/sql join API/statement's implementation (and change >>> the physical plan) >>> >>> Thanks, vino. >>> >>> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月21日周五 >>> 下午4:43写道: >>> Hi All, >>> Sometimes some “dimension table” need to be joined from the "fact >>> table", if data are not joined before sent to Kafka. >>> So if the data are joined in Flink, does the “dimension table” have >>> to be import as a stream, or there are some other ways can achieve it? >>> Thanks a lot! >>> >>> Best >>> Henry >> >