Hi Piotr,

I’d like to share my understanding about this. Source and SourceFunction are 
both interfaces to data sources. SourceFunction was designed and introduced 
earlier and as the project evolved, many shortcomings emerged. Therefore, the 
community re-designed the source interface and introduced the new Source API in 
FLIP-27 [1]. 

Finally we will deprecate the SourceFunction and use Source as the only 
interface for all data sources, but considering the huge cost of migration 
you’ll see SourceFunction and Source co-exist for some time, like the 
ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource as 
a pioneer has already migrated to the new Source API.

I think the API to end users didn't change a lot: both 
env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, 
and you could apply downstream transformations onto it. 

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 

Cheers,

Qingsheng

> On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com> wrote:
> 
> Hi Ken,
> 
> Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> navigating the type system and being still confused about differences between 
> Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> 
> I think the DataStream<> type is what I'm looking for? That is, then I can 
> use:
> 
> DataStream<EventData> source = env.fromSource(getKafkaSource(params), 
> watermarkStrategy, "Kafka");
> when using KafkaSource in the normal setup
> 
> and
> DataStream<EventData> s = env.addSource(new ParallelTestSource<>(...));
> when using the testing source [1]
> 
> Does that sound right?
> 
> [1] 
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> 
> On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kkrugler_li...@transpac.com> 
> wrote:
> Hi Piotr,
> 
> The way I handle this is via a workflow class that uses a builder approach to 
> specifying inputs, outputs, and any other configuration settings.
> 
> The inputs are typically DataStream<xxx>.
> 
> This way I can separate out the Kafka inputs, and use testing sources that 
> give me very precise control over the inputs (e.g. I can hold up on right 
> side data to ensure my stateful left join junction is handling deferred joins 
> properly). I can also use Kafka unit test support (either kafka-junit or 
> Spring embedded Kafka) if needed.
> 
> Then in the actual tool class (with a main method) I’ll wire up the real 
> Kafka sources, with whatever logic is required to convert the consumer 
> records to what the workflow is expecting.
> 
> — Ken
> 
>> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> wrote:
>> 
>> Hi,
>> 
>> I'm wondering: what ithe recommended way to structure the job which one 
>> would like to test later on with `MiniCluster`.
>> 
>> I've looked at the flink-training repository examples [1] and they tend to 
>> expose the main job as a class that accepts a `SourceFunction` and a 
>> `SinkFunction`, which make sense. But then, my job is normally constructed 
>> with `KafkaSource` which is then passed to `env.fromSource(...`.
>> 
>> Is there any recommended way of handling these discrepancies, ie. having to 
>> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>> 
>> [1] 
>> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>> 
>> -- 
>> Piotr Domagalski
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 
> 
> -- 
> Piotr Domagalski

Reply via email to