Hi Carlos, FLIP-238 [1] is proposing a FLIP-27-based data generator source and I think this is what you are looking for. This FLIP was created just days ago so it may take some time to get accepted and released.
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source Cheers, Qingsheng On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos <carlos.sanab...@accenture.com> wrote: > > Hi everyone! > > Sorry for reopening the thread, but I am having some problems related to this > case while migrating our code from Flink 1.12 to Flink 1.15. > > We have a base project that encapsulates a ton of common code and > configurations. One of the abstractions we have is an AbstractDataStreamJob > class that has generic Sources and Sinks. We implemented it like this since > Flink 1.8, following the recommendations of the Flink documentation [1]: > > "Apache Flink provides a JUnit rule called MiniClusterWithClientResource for > testing complete jobs against a local, embedded mini cluster. called > MiniClusterWithClientResource. > ... > A few remarks on integration testing with MiniClusterWithClientResource: > - In order not to copy your whole pipeline code from production to test, make > sources and sinks pluggable in your production code and inject special test > sources and test sinks in your tests. > ..." > > This way, we can create the real Kafka Sources and Sinks in the Main class of > the job, and also create the test Sources and Sinks in the Junit tests, and > inject them in the AbstractDataStreamJob class. > > The problem comes with the new Source interface and the end to end tests > against the local embedded mini cluster. Prior to Flink 1.15, we used the > FromElementsFunction to create the test SourceFunction. Now that we changed > the code to use the new Source interface, we cannot use the > FromElementsFunction anymore, and we haven't found an equivalent > FromElementsSource class with the same functionality but implemented using > the new Source API. > > We want to keep the same structure in the AbstractDataStreamJob class (with > generic and pluggable sources and sinks), as we think it is the most elegant > and generic solution. > > Is it planned to implement a FromElementsSource class that extends the new > Source API? Is there any other alternative that may serve as a workaround for > the moment? > > We have tried to implement a custom Source for this use case, but it seems > like an overwhelming task and we do not want to reinvent the wheel either. If > it is planned to implement the FromElementsSource we'd rather prefer to wait > for it. > > Thanks! > Carlos > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource > > -----Original Message----- > From: Qingsheng Ren <renqs...@gmail.com> > Sent: miércoles, 25 de mayo de 2022 12:10 > To: Piotr Domagalski <pi...@domagalski.com> > Cc: user@flink.apache.org > Subject: [External] Re: Source vs SourceFunction and testing > > This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with > links and attachments. > > Glad to see you have resolved the issue! > > If you want to learn more about the Source API, the Flink document [1] has a > detailed description about it. The original proposal FLIP-27 [2] is also a > good reference. > > [1] > https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1W8&e= > [2] > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterface&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5V3MIQ&e= > > Cheers, > > Qingsheng > > > On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com> wrote: > > > > Thank you Qingsheng, this context helps a lot! > > > > And once again thank you all for being such a helpful community! > > > > P.S. I actually struggled for a bit trying to understand why my refactored > > solution which accepts DataStream<> wouldn't work ("no operators defined in > > the streaming topology"). Turns out, my assumption that I can call > > StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get > > the same environment, was wrong. I had env.addSource and env.fromSource > > calls using one instance of the environment, but then called env.execute() > > on another instance :facepalm: > > > > On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <renqs...@gmail.com> wrote: > > Hi Piotr, > > > > I’d like to share my understanding about this. Source and SourceFunction > > are both interfaces to data sources. SourceFunction was designed and > > introduced earlier and as the project evolved, many shortcomings emerged. > > Therefore, the community re-designed the source interface and introduced > > the new Source API in FLIP-27 [1]. > > > > Finally we will deprecate the SourceFunction and use Source as the only > > interface for all data sources, but considering the huge cost of migration > > you’ll see SourceFunction and Source co-exist for some time, like the > > ParallelTestSource you mentioned is still on SourceFunction, and > > KafkaSource as a pioneer has already migrated to the new Source API. > > > > I think the API to end users didn't change a lot: both > > env.addSource(SourceFunction) and env.fromSource(Source) return a > > DataStream, and you could apply downstream transformations onto it. > > > > [1] > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_ > > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterfac > > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDI > > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jX > > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5 > > V3MIQ&e= > > > > Cheers, > > > > Qingsheng > > > > > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com> wrote: > > > > > > Hi Ken, > > > > > > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, > > > navigating the type system and being still confused about differences > > > between Source, SourceFunction, DataStream, DataStreamOperator, etc. > > > > > > I think the DataStream<> type is what I'm looking for? That is, then I > > > can use: > > > > > > DataStream<EventData> source = > > > env.fromSource(getKafkaSource(params), watermarkStrategy, "Kafka"); > > > when using KafkaSource in the normal setup > > > > > > and > > > DataStream<EventData> s = env.addSource(new > > > ParallelTestSource<>(...)); when using the testing source [1] > > > > > > Does that sound right? > > > > > > [1] > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apac > > > he_flink-2Dtraining_blob_master_common_src_test_java_org_apache_flin > > > k_training_exercises_testing_ParallelTestSource.java-23L26&d=DwIFaQ& > > > c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZ > > > gTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVl > > > tFNinifOKvurHPTzdPL1da&s=eAmu4e10Rx2sRi9WMCvaVlljXiKpph9rddEY4gT6wik > > > &e= > > > > > > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kkrugler_li...@transpac.com> > > > wrote: > > > Hi Piotr, > > > > > > The way I handle this is via a workflow class that uses a builder > > > approach to specifying inputs, outputs, and any other configuration > > > settings. > > > > > > The inputs are typically DataStream<xxx>. > > > > > > This way I can separate out the Kafka inputs, and use testing sources > > > that give me very precise control over the inputs (e.g. I can hold up on > > > right side data to ensure my stateful left join junction is handling > > > deferred joins properly). I can also use Kafka unit test support (either > > > kafka-junit or Spring embedded Kafka) if needed. > > > > > > Then in the actual tool class (with a main method) I’ll wire up the real > > > Kafka sources, with whatever logic is required to convert the consumer > > > records to what the workflow is expecting. > > > > > > — Ken > > > > > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> > > >> wrote: > > >> > > >> Hi, > > >> > > >> I'm wondering: what ithe recommended way to structure the job which one > > >> would like to test later on with `MiniCluster`. > > >> > > >> I've looked at the flink-training repository examples [1] and they tend > > >> to expose the main job as a class that accepts a `SourceFunction` and a > > >> `SinkFunction`, which make sense. But then, my job is normally > > >> constructed with `KafkaSource` which is then passed to > > >> `env.fromSource(...`. > > >> > > >> Is there any recommended way of handling these discrepancies, ie. having > > >> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`? > > >> > > >> [1] > > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apa > > >> che_flink-2Dtraining_blob_05791e55ad7ff0358b5c57ea8f40eada4a1f626a_ > > >> ride-2Dcleansing_src_test_java_org_apache_flink_training_exercises_ > > >> ridecleansing_RideCleansingIntegrationTest.java-23L61&d=DwIFaQ&c=eI > > >> GjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTN > > >> bdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltF > > >> NinifOKvurHPTzdPL1da&s=Kn2wMHDZwLCCp1FoG1WCmg-rfAS2577zxQnqpZfUdwU& > > >> e= > > >> > > >> -- > > >> Piotr Domagalski > > > > > > -------------------------- > > > Ken Krugler > > > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.scaleunlimit > > > ed.com&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q615 > > > 7zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQock > > > pCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=DYLpp8_j5uOXA4FnVMdSLmXZ3zk > > > b2whztkDXJhux5r4&e= > > > Custom big data solutions > > > Flink, Pinot, Solr, Elasticsearch > > > > > > > > > > > > > > > > > > -- > > > Piotr Domagalski > > > > > > > > -- > > Piotr Domagalski > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Where allowed by local law, > electronic communications with Accenture and its affiliates, including e-mail > and instant messaging (including content), may be scanned by our systems for > the purposes of information security and assessment of internal compliance > with Accenture policy. Your privacy is important to us. Accenture uses your > personal data only in compliance with data protection laws. For further > information on how Accenture processes your personal data, please see our > privacy statement at https://www.accenture.com/us-en/privacy-policy. > ______________________________________________________________________________________ > > www.accenture.com