Hi Carlos,

FLIP-238 [1] is proposing a FLIP-27-based data generator source and I
think this is what you are looking for. This FLIP was created just
days ago so it may take some time to get accepted and released.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source

Cheers,

Qingsheng

On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos
<carlos.sanab...@accenture.com> wrote:
>
> Hi everyone!
>
> Sorry for reopening the thread, but I am having some problems related to this 
> case while migrating our code from Flink 1.12 to Flink 1.15.
>
> We have a base project that encapsulates a ton of common code and 
> configurations. One of the abstractions we have is an AbstractDataStreamJob 
> class that has generic Sources and Sinks. We implemented it like this since 
> Flink 1.8, following the recommendations of the Flink documentation [1]:
>
> "Apache Flink provides a JUnit rule called MiniClusterWithClientResource for 
> testing complete jobs against a local, embedded mini cluster. called 
> MiniClusterWithClientResource.
> ...
> A few remarks on integration testing with MiniClusterWithClientResource:
> - In order not to copy your whole pipeline code from production to test, make 
> sources and sinks pluggable in your production code and inject special test 
> sources and test sinks in your tests.
> ..."
>
> This way, we can create the real Kafka Sources and Sinks in the Main class of 
> the job, and also create the test Sources and Sinks in the Junit tests, and 
> inject them in the AbstractDataStreamJob class.
>
> The problem comes with the new Source interface and the end to end tests 
> against the local embedded mini cluster. Prior to Flink 1.15, we used the 
> FromElementsFunction to create the test SourceFunction. Now that we changed 
> the code to use the new Source interface, we cannot use the 
> FromElementsFunction anymore, and we haven't found an equivalent 
> FromElementsSource class with the same functionality but implemented using 
> the new Source API.
>
> We want to keep the same structure in the AbstractDataStreamJob class (with 
> generic and pluggable sources and sinks), as we think it is the most elegant 
> and generic solution.
>
> Is it planned to implement a FromElementsSource class that extends the new 
> Source API? Is there any other alternative that may serve as a workaround for 
> the moment?
>
> We have tried to implement a custom Source for this use case, but it seems 
> like an overwhelming task and we do not want to reinvent the wheel either. If 
> it is planned to implement the FromElementsSource we'd rather prefer to wait 
> for it.
>
> Thanks!
> Carlos
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource
>
> -----Original Message-----
> From: Qingsheng Ren <renqs...@gmail.com>
> Sent: miércoles, 25 de mayo de 2022 12:10
> To: Piotr Domagalski <pi...@domagalski.com>
> Cc: user@flink.apache.org
> Subject: [External] Re: Source vs SourceFunction and testing
>
> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with 
> links and attachments.
>
> Glad to see you have resolved the issue!
>
> If you want to learn more about the Source API, the Flink document [1] has a 
> detailed description about it. The original proposal FLIP-27 [2] is also a 
> good reference.
>
> [1] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1W8&e=
> [2] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterface&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5V3MIQ&e=
>
> Cheers,
>
> Qingsheng
>
> > On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com> wrote:
> >
> > Thank you Qingsheng, this context helps a lot!
> >
> > And once again thank you all for being such a helpful community!
> >
> > P.S. I actually struggled for a bit trying to understand why my refactored 
> > solution which accepts DataStream<> wouldn't work ("no operators defined in 
> > the streaming topology"). Turns out, my assumption that I can call 
> > StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get 
> > the same environment, was wrong. I had env.addSource and env.fromSource 
> > calls using one instance of the environment, but then called env.execute() 
> > on another instance :facepalm:
> >
> > On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <renqs...@gmail.com> wrote:
> > Hi Piotr,
> >
> > I’d like to share my understanding about this. Source and SourceFunction 
> > are both interfaces to data sources. SourceFunction was designed and 
> > introduced earlier and as the project evolved, many shortcomings emerged. 
> > Therefore, the community re-designed the source interface and introduced 
> > the new Source API in FLIP-27 [1].
> >
> > Finally we will deprecate the SourceFunction and use Source as the only 
> > interface for all data sources, but considering the huge cost of migration 
> > you’ll see SourceFunction and Source co-exist for some time, like the 
> > ParallelTestSource you mentioned is still on SourceFunction, and 
> > KafkaSource as a pioneer has already migrated to the new Source API.
> >
> > I think the API to end users didn't change a lot: both 
> > env.addSource(SourceFunction) and env.fromSource(Source) return a 
> > DataStream, and you could apply downstream transformations onto it.
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterfac
> > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDI
> > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jX
> > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5
> > V3MIQ&e=
> >
> > Cheers,
> >
> > Qingsheng
> >
> > > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com> wrote:
> > >
> > > Hi Ken,
> > >
> > > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> > > navigating the type system and being still confused about differences 
> > > between Source, SourceFunction, DataStream, DataStreamOperator, etc.
> > >
> > > I think the DataStream<> type is what I'm looking for? That is, then I 
> > > can use:
> > >
> > > DataStream<EventData> source =
> > > env.fromSource(getKafkaSource(params), watermarkStrategy, "Kafka");
> > > when using KafkaSource in the normal setup
> > >
> > > and
> > > DataStream<EventData> s = env.addSource(new
> > > ParallelTestSource<>(...)); when using the testing source [1]
> > >
> > > Does that sound right?
> > >
> > > [1]
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apac
> > > he_flink-2Dtraining_blob_master_common_src_test_java_org_apache_flin
> > > k_training_exercises_testing_ParallelTestSource.java-23L26&d=DwIFaQ&
> > > c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZ
> > > gTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVl
> > > tFNinifOKvurHPTzdPL1da&s=eAmu4e10Rx2sRi9WMCvaVlljXiKpph9rddEY4gT6wik
> > > &e=
> > >
> > > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kkrugler_li...@transpac.com> 
> > > wrote:
> > > Hi Piotr,
> > >
> > > The way I handle this is via a workflow class that uses a builder 
> > > approach to specifying inputs, outputs, and any other configuration 
> > > settings.
> > >
> > > The inputs are typically DataStream<xxx>.
> > >
> > > This way I can separate out the Kafka inputs, and use testing sources 
> > > that give me very precise control over the inputs (e.g. I can hold up on 
> > > right side data to ensure my stateful left join junction is handling 
> > > deferred joins properly). I can also use Kafka unit test support (either 
> > > kafka-junit or Spring embedded Kafka) if needed.
> > >
> > > Then in the actual tool class (with a main method) I’ll wire up the real 
> > > Kafka sources, with whatever logic is required to convert the consumer 
> > > records to what the workflow is expecting.
> > >
> > > — Ken
> > >
> > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> 
> > >> wrote:
> > >>
> > >> Hi,
> > >>
> > >> I'm wondering: what ithe recommended way to structure the job which one 
> > >> would like to test later on with `MiniCluster`.
> > >>
> > >> I've looked at the flink-training repository examples [1] and they tend 
> > >> to expose the main job as a class that accepts a `SourceFunction` and a 
> > >> `SinkFunction`, which make sense. But then, my job is normally 
> > >> constructed with `KafkaSource` which is then passed to 
> > >> `env.fromSource(...`.
> > >>
> > >> Is there any recommended way of handling these discrepancies, ie. having 
> > >> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> > >>
> > >> [1]
> > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apa
> > >> che_flink-2Dtraining_blob_05791e55ad7ff0358b5c57ea8f40eada4a1f626a_
> > >> ride-2Dcleansing_src_test_java_org_apache_flink_training_exercises_
> > >> ridecleansing_RideCleansingIntegrationTest.java-23L61&d=DwIFaQ&c=eI
> > >> GjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTN
> > >> bdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltF
> > >> NinifOKvurHPTzdPL1da&s=Kn2wMHDZwLCCp1FoG1WCmg-rfAS2577zxQnqpZfUdwU&
> > >> e=
> > >>
> > >> --
> > >> Piotr Domagalski
> > >
> > > --------------------------
> > > Ken Krugler
> > > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.scaleunlimit
> > > ed.com&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q615
> > > 7zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQock
> > > pCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=DYLpp8_j5uOXA4FnVMdSLmXZ3zk
> > > b2whztkDXJhux5r4&e=
> > > Custom big data solutions
> > > Flink, Pinot, Solr, Elasticsearch
> > >
> > >
> > >
> > >
> > >
> > > --
> > > Piotr Domagalski
> >
> >
> >
> > --
> > Piotr Domagalski
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. Your privacy is important to us. Accenture uses your 
> personal data only in compliance with data protection laws. For further 
> information on how Accenture processes your personal data, please see our 
> privacy statement at https://www.accenture.com/us-en/privacy-policy.
> ______________________________________________________________________________________
>
> www.accenture.com

Reply via email to