If I understand your problem right, you can just use JdbcIO.readRows(), which 
returns a PCollection<Row> and can be used downstream to create a 
PCollectionTuple, which, in its turn, already contains another PCollection<Row> 
from your Kafka source. So, once you have a PCollectionTuple with two TupleTags 
(from Kafka and MySql), you can apply SqlTransform over it.

—
Alexey




> On 11 Jan 2022, at 03:54, Yushu Yao <[email protected]> wrote:
> 
> Thanks, Brian for the explanation. That helps a lot. 
> Now I'm clear on the Kafka source side. 
> 
> A follow-up on the other source that's in MySql. If I want to do the query:
> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key 
> 
> I can get the Kafka stream into a PCollection as you said above. 
> How about the MySql Table 1? Is there some semantic in Beam that allows me to 
> make the MySql table into a PCollection? (Or do I need to import it as a 
> PCollection? I think there is a Beam SQL Extension for it?) And does it need 
> to scan the full MySql Table1 to accomplish the above join? 
> 
> Thanks again!
> -Yushu
> 
> 
> On Mon, Jan 10, 2022 at 1:50 PM Brian Hulette <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Yushu,
> Thanks for the questions! To process Kafka data with SqlTransform you have a 
> couple of options, you could just use KafkaIO and manually transforms the 
> records to produce a PCollection with a Schema [1], or you could use the DDL 
> to describe your kafka stream as a table [2], and query it directly with 
> SqlTransform. You can find examples of using the DDL with SqlTransform here 
> [3]. Note that the Kafka DDL supports "Generic Payload Handling", so you 
> should be able to configure it to consume JSON, proto, thrift, or avro 
> messages [4]. Would one of those work for you?
> 
> For your second question about "pushing down" the join on 2 tables: 
> unfortunately, that's not something we support right now. You'd have to do 
> that sort of optimization manually. This is something we've discussed in the 
> abstract but it's a ways off.
> 
> Brian
> 
> [1] https://beam.apache.org/documentation/programming-guide/#what-is-a-schema 
> <https://beam.apache.org/documentation/programming-guide/#what-is-a-schema>
> [2] 
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka
>  
> <https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka>
> [3] 
> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html
>  
> <https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html>
> [4] 
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling
>  
> <https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling>
> On Mon, Jan 10, 2022 at 12:15 PM Yushu Yao <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Folks, 
> 
> Question from a Newbie for both Calcite and Beam:
> 
> I understand Calcite can make a tree of execution plan with relational 
> algebra and push certain operations to a "data source". And at the same time, 
> it can allow source-specific optimizations. 
> 
> I also understand that Beam SQL can run SqlTransform.query() on one or more 
> of the PCollection<Row>, and Calcite is used in coming up with the execution 
> plan. 
> 
> My question is, assume I have a MySql Table as Table1, and a Kafka Stream 
> called "Kafka". 
> 
> Now I want to do some joins like lookuping up a row based on a key in the 
> Kafka message: 
> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key 
> 
> What's the best way to implement this with beamSQL. (Note that we can't 
> hardcode the join because each input Kafka message may need a different SQL). 
> 
> One step further, if we have 2 MySql Tables, Table1, and Table2. And a Kafka 
> Stream "Kafka". And we want to join those 2 tables inside MySql first (and 
> maybe with aggregations like sum/count), then join with the Kafka. Is there a 
> way to tap into calcite so that the join of the 2 tables are actually pushed 
> into MySql? 
> 
> Sorry for the lengthy question and please let me know if more clarifications 
> is needed. 
> 
> Thanks a lot in advanced!
> 
> -Yushu
> 
> 
> 

Reply via email to