Thanks, Brian for the explanation. That helps a lot.
Now I'm clear on the Kafka source side.

A follow-up on the other source that's in MySql. If I want to do the query:
select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key

I can get the Kafka stream into a PCollection as you said above.
How about the MySql Table 1? Is there some semantic in Beam that allows me
to make the MySql table into a PCollection? (Or do I need to import it as a
PCollection? I think there is a Beam SQL Extension for it?) And does it
need to scan the full MySql Table1 to accomplish the above join?

Thanks again!
-Yushu


On Mon, Jan 10, 2022 at 1:50 PM Brian Hulette <[email protected]> wrote:

> Hi Yushu,
> Thanks for the questions! To process Kafka data with SqlTransform you have
> a couple of options, you could just use KafkaIO and manually transforms the
> records to produce a PCollection with a Schema [1], or you could use the
> DDL to describe your kafka stream as a table [2], and query it directly
> with SqlTransform. You can find examples of using the DDL with SqlTransform
> here [3]. Note that the Kafka DDL supports "Generic Payload Handling", so
> you should be able to configure it to consume JSON, proto, thrift, or avro
> messages [4]. Would one of those work for you?
>
> For your second question about "pushing down" the join on 2 tables:
> unfortunately, that's not something we support right now. You'd have to do
> that sort of optimization manually. This is something we've discussed in
> the abstract but it's a ways off.
>
> Brian
>
> [1]
> https://beam.apache.org/documentation/programming-guide/#what-is-a-schema
> [2]
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka
> [3]
> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html
> [4]
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling
>
> On Mon, Jan 10, 2022 at 12:15 PM Yushu Yao <[email protected]> wrote:
>
>> Hi Folks,
>>
>> Question from a Newbie for both Calcite and Beam:
>>
>> I understand Calcite can make a tree of execution plan with relational
>> algebra and push certain operations to a "data source". And at the same
>> time, it can allow source-specific optimizations.
>>
>> I also understand that Beam SQL can run SqlTransform.query() on one or
>> more of the PCollection<Row>, and Calcite is used in coming up with the
>> execution plan.
>>
>> My question is, assume I have a MySql Table as Table1, and a Kafka Stream
>> called "Kafka".
>>
>> Now I want to do some joins like lookuping up a row based on a key in the
>> Kafka message:
>> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key
>>
>> What's the best way to implement this with beamSQL. (Note that we can't
>> hardcode the join because each input Kafka message may need a different
>> SQL).
>>
>> One step further, if we have 2 MySql Tables, Table1, and Table2. And a
>> Kafka Stream "Kafka". And we want to join those 2 tables inside MySql first
>> (and maybe with aggregations like sum/count), then join with the Kafka. Is
>> there a way to tap into calcite so that the join of the 2 tables are
>> actually pushed into MySql?
>>
>> Sorry for the lengthy question and please let me know if more
>> clarifications is needed.
>>
>> Thanks a lot in advanced!
>>
>> -Yushu
>>
>>
>>
>>

Reply via email to