Ok, get it. JdbcIO.readRows() is what I'm looking for. Thanks!
On Tue, Jan 11, 2022 at 2:47 PM Alexey Romanenko <[email protected]> wrote: > If I understand your problem right, you can just use JdbcIO.readRows(), > which returns a PCollection<Row> and can be used downstream to create > a PCollectionTuple, which, in its turn, already contains another > PCollection<Row> from your Kafka source. So, once you have > a PCollectionTuple with two TupleTags (from Kafka and MySql), you can apply > SqlTransform over it. > > — > Alexey > > > > > On 11 Jan 2022, at 03:54, Yushu Yao <[email protected]> wrote: > > Thanks, Brian for the explanation. That helps a lot. > Now I'm clear on the Kafka source side. > > A follow-up on the other source that's in MySql. If I want to do the query: > select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key > > I can get the Kafka stream into a PCollection as you said above. > How about the MySql Table 1? Is there some semantic in Beam that allows me > to make the MySql table into a PCollection? (Or do I need to import it as a > PCollection? I think there is a Beam SQL Extension for it?) And does it > need to scan the full MySql Table1 to accomplish the above join? > > Thanks again! > -Yushu > > > On Mon, Jan 10, 2022 at 1:50 PM Brian Hulette <[email protected]> wrote: > >> Hi Yushu, >> Thanks for the questions! To process Kafka data with SqlTransform you >> have a couple of options, you could just use KafkaIO and manually >> transforms the records to produce a PCollection with a Schema [1], or you >> could use the DDL to describe your kafka stream as a table [2], and query >> it directly with SqlTransform. You can find examples of using the DDL with >> SqlTransform here [3]. Note that the Kafka DDL supports "Generic Payload >> Handling", so you should be able to configure it to consume JSON, proto, >> thrift, or avro messages [4]. Would one of those work for you? >> >> For your second question about "pushing down" the join on 2 tables: >> unfortunately, that's not something we support right now. You'd have to do >> that sort of optimization manually. This is something we've discussed in >> the abstract but it's a ways off. >> >> Brian >> >> [1] >> https://beam.apache.org/documentation/programming-guide/#what-is-a-schema >> [2] >> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka >> [3] >> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html >> [4] >> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling >> >> On Mon, Jan 10, 2022 at 12:15 PM Yushu Yao <[email protected]> wrote: >> >>> Hi Folks, >>> >>> Question from a Newbie for both Calcite and Beam: >>> >>> I understand Calcite can make a tree of execution plan with relational >>> algebra and push certain operations to a "data source". And at the same >>> time, it can allow source-specific optimizations. >>> >>> I also understand that Beam SQL can run SqlTransform.query() on one or >>> more of the PCollection<Row>, and Calcite is used in coming up with the >>> execution plan. >>> >>> My question is, assume I have a MySql Table as Table1, and a Kafka >>> Stream called "Kafka". >>> >>> Now I want to do some joins like lookuping up a row based on a key in >>> the Kafka message: >>> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key >>> >>> What's the best way to implement this with beamSQL. (Note that we can't >>> hardcode the join because each input Kafka message may need a different >>> SQL). >>> >>> One step further, if we have 2 MySql Tables, Table1, and Table2. And a >>> Kafka Stream "Kafka". And we want to join those 2 tables inside MySql first >>> (and maybe with aggregations like sum/count), then join with the Kafka. Is >>> there a way to tap into calcite so that the join of the 2 tables are >>> actually pushed into MySql? >>> >>> Sorry for the lengthy question and please let me know if more >>> clarifications is needed. >>> >>> Thanks a lot in advanced! >>> >>> -Yushu >>> >>> >>> >>> >
