Hi Carlos, You might want to join the discussion about FLIP-238[1] to share your thoughts with us. Thanks!
Best regards, Jing [1] https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt On Thu, Jun 9, 2022 at 2:13 PM Sanabria, Carlos < carlos.sanab...@accenture.com> wrote: > Thanks for your quick response! > > Yes, this is exactly what we were looking for! > Seems like a really nice feature. Even better than the FromElementsSource > we were asking for, because it allows to generate the events dynamically. > > Is there any way we can vote for the FLIP-238 to be accepted? > > -----Original Message----- > From: Qingsheng Ren <renqs...@gmail.com> > Sent: jueves, 9 de junio de 2022 12:16 > To: Sanabria, Carlos <carlos.sanab...@accenture.com> > Cc: user <user@flink.apache.org> > Subject: Re: [External] Re: Source vs SourceFunction and testing > > Hi Carlos, > > FLIP-238 [1] is proposing a FLIP-27-based data generator source and I > think this is what you are looking for. This FLIP was created just days ago > so it may take some time to get accepted and released. > > [1] > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D238-253A-2BIntroduce-2BFLIP-2D27-2Dbased-2BData-2BGenerator-2BSource&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3q1F&s=ZRTHrrDGp1m0Po50VeAFjAEQBjCM28naJRNWM4CZQoA&e= > > Cheers, > > Qingsheng > > On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos < > carlos.sanab...@accenture.com> wrote: > > > > Hi everyone! > > > > Sorry for reopening the thread, but I am having some problems related to > this case while migrating our code from Flink 1.12 to Flink 1.15. > > > > We have a base project that encapsulates a ton of common code and > configurations. One of the abstractions we have is an AbstractDataStreamJob > class that has generic Sources and Sinks. We implemented it like this since > Flink 1.8, following the recommendations of the Flink documentation [1]: > > > > "Apache Flink provides a JUnit rule called MiniClusterWithClientResource > for testing complete jobs against a local, embedded mini cluster. called > MiniClusterWithClientResource. > > ... > > A few remarks on integration testing with MiniClusterWithClientResource: > > - In order not to copy your whole pipeline code from production to test, > make sources and sinks pluggable in your production code and inject special > test sources and test sinks in your tests. > > ..." > > > > This way, we can create the real Kafka Sources and Sinks in the Main > class of the job, and also create the test Sources and Sinks in the Junit > tests, and inject them in the AbstractDataStreamJob class. > > > > The problem comes with the new Source interface and the end to end tests > against the local embedded mini cluster. Prior to Flink 1.15, we used the > FromElementsFunction to create the test SourceFunction. Now that we changed > the code to use the new Source interface, we cannot use the > FromElementsFunction anymore, and we haven't found an equivalent > FromElementsSource class with the same functionality but implemented using > the new Source API. > > > > We want to keep the same structure in the AbstractDataStreamJob class > (with generic and pluggable sources and sinks), as we think it is the most > elegant and generic solution. > > > > Is it planned to implement a FromElementsSource class that extends the > new Source API? Is there any other alternative that may serve as a > workaround for the moment? > > > > We have tried to implement a custom Source for this use case, but it > seems like an overwhelming task and we do not want to reinvent the wheel > either. If it is planned to implement the FromElementsSource we'd rather > prefer to wait for it. > > > > Thanks! > > Carlos > > > > [1] > > https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache. > > org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_testing_-2 > > 3junit-2Drule-2Dminiclusterwithclientresource&d=DwIFaQ&c=eIGjsITfXP_y- > > DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPB > > qIxifg&m=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3 > > q1F&s=RKTpSSHRudC_BMmTz9xhGOT91uAAbp7HPEejuTihHvU&e= > > > > -----Original Message----- > > From: Qingsheng Ren <renqs...@gmail.com> > > Sent: miércoles, 25 de mayo de 2022 12:10 > > To: Piotr Domagalski <pi...@domagalski.com> > > Cc: user@flink.apache.org > > Subject: [External] Re: Source vs SourceFunction and testing > > > > This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with > links and attachments. > > > > Glad to see you have resolved the issue! > > > > If you want to learn more about the Source API, the Flink document [1] > has a detailed description about it. The original proposal FLIP-27 [2] is > also a good reference. > > > > [1] > > https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache. > > org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_&d > > =DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCz > > xlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50 > > aRVltFNinifOKvurHPTzdPL1da&s=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1 > > W8&e= [2] > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_ > > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterfac > > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDI > > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jX > > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5 > > V3MIQ&e= > > > > Cheers, > > > > Qingsheng > > > > > On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com> > wrote: > > > > > > Thank you Qingsheng, this context helps a lot! > > > > > > And once again thank you all for being such a helpful community! > > > > > > P.S. I actually struggled for a bit trying to understand why my > refactored solution which accepts DataStream<> wouldn't work ("no operators > defined in the streaming topology"). Turns out, my assumption that I can > call StreamExecutionEnvironment.getExecutionEnvironment() multiple times > and get the same environment, was wrong. I had env.addSource and > env.fromSource calls using one instance of the environment, but then called > env.execute() on another instance :facepalm: > > > > > > On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <renqs...@gmail.com> > wrote: > > > Hi Piotr, > > > > > > I’d like to share my understanding about this. Source and > SourceFunction are both interfaces to data sources. SourceFunction was > designed and introduced earlier and as the project evolved, many > shortcomings emerged. Therefore, the community re-designed the source > interface and introduced the new Source API in FLIP-27 [1]. > > > > > > Finally we will deprecate the SourceFunction and use Source as the > only interface for all data sources, but considering the huge cost of > migration you’ll see SourceFunction and Source co-exist for some time, like > the ParallelTestSource you mentioned is still on SourceFunction, and > KafkaSource as a pioneer has already migrated to the new Source API. > > > > > > I think the API to end users didn't change a lot: both > env.addSource(SourceFunction) and env.fromSource(Source) return a > DataStream, and you could apply downstream transformations onto it. > > > > > > [1] > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or > > > g_ > > > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterf > > > ac > > > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhi > > > DI > > > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9 > > > jX > > > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZ > > > v5 > > > V3MIQ&e= > > > > > > Cheers, > > > > > > Qingsheng > > > > > > > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com> > wrote: > > > > > > > > Hi Ken, > > > > > > > > Thanks Ken. I guess the problem I had was, as a complete newbie to > Flink, navigating the type system and being still confused about > differences between Source, SourceFunction, DataStream, DataStreamOperator, > etc. > > > > > > > > I think the DataStream<> type is what I'm looking for? That is, then > I can use: > > > > > > > > DataStream<EventData> source = > > > > env.fromSource(getKafkaSource(params), watermarkStrategy, > > > > "Kafka"); when using KafkaSource in the normal setup > > > > > > > > and > > > > DataStream<EventData> s = env.addSource(new > > > > ParallelTestSource<>(...)); when using the testing source [1] > > > > > > > > Does that sound right? > > > > > > > > [1] > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_ap > > > > ac > > > > he_flink-2Dtraining_blob_master_common_src_test_java_org_apache_fl > > > > in > > > > k_training_exercises_testing_ParallelTestSource.java-23L26&d=DwIFa > > > > Q& > > > > c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSp > > > > EZ > > > > gTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aR > > > > Vl > > > > tFNinifOKvurHPTzdPL1da&s=eAmu4e10Rx2sRi9WMCvaVlljXiKpph9rddEY4gT6w > > > > ik > > > > &e= > > > > > > > > On Tue, May 24, 2022 at 7:57 PM Ken Krugler < > kkrugler_li...@transpac.com> wrote: > > > > Hi Piotr, > > > > > > > > The way I handle this is via a workflow class that uses a builder > approach to specifying inputs, outputs, and any other configuration > settings. > > > > > > > > The inputs are typically DataStream<xxx>. > > > > > > > > This way I can separate out the Kafka inputs, and use testing > sources that give me very precise control over the inputs (e.g. I can hold > up on right side data to ensure my stateful left join junction is handling > deferred joins properly). I can also use Kafka unit test support (either > kafka-junit or Spring embedded Kafka) if needed. > > > > > > > > Then in the actual tool class (with a main method) I’ll wire up the > real Kafka sources, with whatever logic is required to convert the consumer > records to what the workflow is expecting. > > > > > > > > — Ken > > > > > > > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> > wrote: > > > >> > > > >> Hi, > > > >> > > > >> I'm wondering: what ithe recommended way to structure the job which > one would like to test later on with `MiniCluster`. > > > >> > > > >> I've looked at the flink-training repository examples [1] and they > tend to expose the main job as a class that accepts a `SourceFunction` and > a `SinkFunction`, which make sense. But then, my job is normally > constructed with `KafkaSource` which is then passed to `env.fromSource(...`. > > > >> > > > >> Is there any recommended way of handling these discrepancies, ie. > having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`? > > > >> > > > >> [1] > > > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_a > > > >> pa > > > >> che_flink-2Dtraining_blob_05791e55ad7ff0358b5c57ea8f40eada4a1f626 > > > >> a_ > > > >> ride-2Dcleansing_src_test_java_org_apache_flink_training_exercise > > > >> s_ > > > >> ridecleansing_RideCleansingIntegrationTest.java-23L61&d=DwIFaQ&c= > > > >> eI > > > >> GjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZg > > > >> TN > > > >> bdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVl > > > >> tF > > > >> NinifOKvurHPTzdPL1da&s=Kn2wMHDZwLCCp1FoG1WCmg-rfAS2577zxQnqpZfUdw > > > >> U& > > > >> e= > > > >> > > > >> -- > > > >> Piotr Domagalski > > > > > > > > -------------------------- > > > > Ken Krugler > > > > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.scaleunlim > > > > it > > > > ed.com&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6 > > > > 15 > > > > 7zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQo > > > > ck > > > > pCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=DYLpp8_j5uOXA4FnVMdSLmXZ3 > > > > zk > > > > b2whztkDXJhux5r4&e= > > > > Custom big data solutions > > > > Flink, Pinot, Solr, Elasticsearch > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Piotr Domagalski > > > > > > > > > > > > -- > > > Piotr Domagalski > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Where allowed > by local law, electronic communications with Accenture and its affiliates, > including e-mail and instant messaging (including content), may be scanned > by our systems for the purposes of information security and assessment of > internal compliance with Accenture policy. Your privacy is important to us. > Accenture uses your personal data only in compliance with data protection > laws. For further information on how Accenture processes your personal > data, please see our privacy statement at > https://www.accenture.com/us-en/privacy-policy. > > ______________________________________________________________________ > > ________________ > > > > http://www.accenture.com >