Hi Carlos,

You might want to join the discussion about FLIP-238[1] to share your
thoughts with us. Thanks!

Best regards,
Jing

[1] https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt


On Thu, Jun 9, 2022 at 2:13 PM Sanabria, Carlos <
carlos.sanab...@accenture.com> wrote:

> Thanks for your quick response!
>
> Yes, this is exactly what we were looking for!
> Seems like a really nice feature. Even better than the FromElementsSource
> we were asking for, because it allows to generate the events dynamically.
>
> Is there any way we can vote for the FLIP-238 to be accepted?
>
> -----Original Message-----
> From: Qingsheng Ren <renqs...@gmail.com>
> Sent: jueves, 9 de junio de 2022 12:16
> To: Sanabria, Carlos <carlos.sanab...@accenture.com>
> Cc: user <user@flink.apache.org>
> Subject: Re: [External] Re: Source vs SourceFunction and testing
>
> Hi Carlos,
>
> FLIP-238 [1] is proposing a FLIP-27-based data generator source and I
> think this is what you are looking for. This FLIP was created just days ago
> so it may take some time to get accepted and released.
>
> [1]
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D238-253A-2BIntroduce-2BFLIP-2D27-2Dbased-2BData-2BGenerator-2BSource&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3q1F&s=ZRTHrrDGp1m0Po50VeAFjAEQBjCM28naJRNWM4CZQoA&e=
>
> Cheers,
>
> Qingsheng
>
> On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos <
> carlos.sanab...@accenture.com> wrote:
> >
> > Hi everyone!
> >
> > Sorry for reopening the thread, but I am having some problems related to
> this case while migrating our code from Flink 1.12 to Flink 1.15.
> >
> > We have a base project that encapsulates a ton of common code and
> configurations. One of the abstractions we have is an AbstractDataStreamJob
> class that has generic Sources and Sinks. We implemented it like this since
> Flink 1.8, following the recommendations of the Flink documentation [1]:
> >
> > "Apache Flink provides a JUnit rule called MiniClusterWithClientResource
> for testing complete jobs against a local, embedded mini cluster. called
> MiniClusterWithClientResource.
> > ...
> > A few remarks on integration testing with MiniClusterWithClientResource:
> > - In order not to copy your whole pipeline code from production to test,
> make sources and sinks pluggable in your production code and inject special
> test sources and test sinks in your tests.
> > ..."
> >
> > This way, we can create the real Kafka Sources and Sinks in the Main
> class of the job, and also create the test Sources and Sinks in the Junit
> tests, and inject them in the AbstractDataStreamJob class.
> >
> > The problem comes with the new Source interface and the end to end tests
> against the local embedded mini cluster. Prior to Flink 1.15, we used the
> FromElementsFunction to create the test SourceFunction. Now that we changed
> the code to use the new Source interface, we cannot use the
> FromElementsFunction anymore, and we haven't found an equivalent
> FromElementsSource class with the same functionality but implemented using
> the new Source API.
> >
> > We want to keep the same structure in the AbstractDataStreamJob class
> (with generic and pluggable sources and sinks), as we think it is the most
> elegant and generic solution.
> >
> > Is it planned to implement a FromElementsSource class that extends the
> new Source API? Is there any other alternative that may serve as a
> workaround for the moment?
> >
> > We have tried to implement a custom Source for this use case, but it
> seems like an overwhelming task and we do not want to reinvent the wheel
> either. If it is planned to implement the FromElementsSource we'd rather
> prefer to wait for it.
> >
> > Thanks!
> > Carlos
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.
> > org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_testing_-2
> > 3junit-2Drule-2Dminiclusterwithclientresource&d=DwIFaQ&c=eIGjsITfXP_y-
> > DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPB
> > qIxifg&m=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3
> > q1F&s=RKTpSSHRudC_BMmTz9xhGOT91uAAbp7HPEejuTihHvU&e=
> >
> > -----Original Message-----
> > From: Qingsheng Ren <renqs...@gmail.com>
> > Sent: miércoles, 25 de mayo de 2022 12:10
> > To: Piotr Domagalski <pi...@domagalski.com>
> > Cc: user@flink.apache.org
> > Subject: [External] Re: Source vs SourceFunction and testing
> >
> > This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with
> links and attachments.
> >
> > Glad to see you have resolved the issue!
> >
> > If you want to learn more about the Source API, the Flink document [1]
> has a detailed description about it. The original proposal FLIP-27 [2] is
> also a good reference.
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.
> > org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_&d
> > =DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCz
> > xlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50
> > aRVltFNinifOKvurHPTzdPL1da&s=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1
> > W8&e= [2]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterfac
> > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDI
> > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jX
> > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5
> > V3MIQ&e=
> >
> > Cheers,
> >
> > Qingsheng
> >
> > > On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com>
> wrote:
> > >
> > > Thank you Qingsheng, this context helps a lot!
> > >
> > > And once again thank you all for being such a helpful community!
> > >
> > > P.S. I actually struggled for a bit trying to understand why my
> refactored solution which accepts DataStream<> wouldn't work ("no operators
> defined in the streaming topology"). Turns out, my assumption that I can
> call StreamExecutionEnvironment.getExecutionEnvironment() multiple times
> and get the same environment, was wrong. I had env.addSource and
> env.fromSource calls using one instance of the environment, but then called
> env.execute() on another instance :facepalm:
> > >
> > > On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <renqs...@gmail.com>
> wrote:
> > > Hi Piotr,
> > >
> > > I’d like to share my understanding about this. Source and
> SourceFunction are both interfaces to data sources. SourceFunction was
> designed and introduced earlier and as the project evolved, many
> shortcomings emerged. Therefore, the community re-designed the source
> interface and introduced the new Source API in FLIP-27 [1].
> > >
> > > Finally we will deprecate the SourceFunction and use Source as the
> only interface for all data sources, but considering the huge cost of
> migration you’ll see SourceFunction and Source co-exist for some time, like
> the ParallelTestSource you mentioned is still on SourceFunction, and
> KafkaSource as a pioneer has already migrated to the new Source API.
> > >
> > > I think the API to end users didn't change a lot: both
> env.addSource(SourceFunction) and env.fromSource(Source) return a
> DataStream, and you could apply downstream transformations onto it.
> > >
> > > [1]
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > > g_
> > > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterf
> > > ac
> > > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhi
> > > DI
> > > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9
> > > jX
> > > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZ
> > > v5
> > > V3MIQ&e=
> > >
> > > Cheers,
> > >
> > > Qingsheng
> > >
> > > > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com>
> wrote:
> > > >
> > > > Hi Ken,
> > > >
> > > > Thanks Ken. I guess the problem I had was, as a complete newbie to
> Flink, navigating the type system and being still confused about
> differences between Source, SourceFunction, DataStream, DataStreamOperator,
> etc.
> > > >
> > > > I think the DataStream<> type is what I'm looking for? That is, then
> I can use:
> > > >
> > > > DataStream<EventData> source =
> > > > env.fromSource(getKafkaSource(params), watermarkStrategy,
> > > > "Kafka"); when using KafkaSource in the normal setup
> > > >
> > > > and
> > > > DataStream<EventData> s = env.addSource(new
> > > > ParallelTestSource<>(...)); when using the testing source [1]
> > > >
> > > > Does that sound right?
> > > >
> > > > [1]
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_ap
> > > > ac
> > > > he_flink-2Dtraining_blob_master_common_src_test_java_org_apache_fl
> > > > in
> > > > k_training_exercises_testing_ParallelTestSource.java-23L26&d=DwIFa
> > > > Q&
> > > > c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSp
> > > > EZ
> > > > gTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aR
> > > > Vl
> > > > tFNinifOKvurHPTzdPL1da&s=eAmu4e10Rx2sRi9WMCvaVlljXiKpph9rddEY4gT6w
> > > > ik
> > > > &e=
> > > >
> > > > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <
> kkrugler_li...@transpac.com> wrote:
> > > > Hi Piotr,
> > > >
> > > > The way I handle this is via a workflow class that uses a builder
> approach to specifying inputs, outputs, and any other configuration
> settings.
> > > >
> > > > The inputs are typically DataStream<xxx>.
> > > >
> > > > This way I can separate out the Kafka inputs, and use testing
> sources that give me very precise control over the inputs (e.g. I can hold
> up on right side data to ensure my stateful left join junction is handling
> deferred joins properly). I can also use Kafka unit test support (either
> kafka-junit or Spring embedded Kafka) if needed.
> > > >
> > > > Then in the actual tool class (with a main method) I’ll wire up the
> real Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
> > > >
> > > > — Ken
> > > >
> > > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com>
> wrote:
> > > >>
> > > >> Hi,
> > > >>
> > > >> I'm wondering: what ithe recommended way to structure the job which
> one would like to test later on with `MiniCluster`.
> > > >>
> > > >> I've looked at the flink-training repository examples [1] and they
> tend to expose the main job as a class that accepts a `SourceFunction` and
> a `SinkFunction`, which make sense. But then, my job is normally
> constructed with `KafkaSource` which is then passed to `env.fromSource(...`.
> > > >>
> > > >> Is there any recommended way of handling these discrepancies, ie.
> having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> > > >>
> > > >> [1]
> > > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_a
> > > >> pa
> > > >> che_flink-2Dtraining_blob_05791e55ad7ff0358b5c57ea8f40eada4a1f626
> > > >> a_
> > > >> ride-2Dcleansing_src_test_java_org_apache_flink_training_exercise
> > > >> s_
> > > >> ridecleansing_RideCleansingIntegrationTest.java-23L61&d=DwIFaQ&c=
> > > >> eI
> > > >> GjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZg
> > > >> TN
> > > >> bdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVl
> > > >> tF
> > > >> NinifOKvurHPTzdPL1da&s=Kn2wMHDZwLCCp1FoG1WCmg-rfAS2577zxQnqpZfUdw
> > > >> U&
> > > >> e=
> > > >>
> > > >> --
> > > >> Piotr Domagalski
> > > >
> > > > --------------------------
> > > > Ken Krugler
> > > > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.scaleunlim
> > > > it
> > > > ed.com&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6
> > > > 15
> > > > 7zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQo
> > > > ck
> > > > pCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=DYLpp8_j5uOXA4FnVMdSLmXZ3
> > > > zk
> > > > b2whztkDXJhux5r4&e=
> > > > Custom big data solutions
> > > > Flink, Pinot, Solr, Elasticsearch
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Piotr Domagalski
> > >
> > >
> > >
> > > --
> > > Piotr Domagalski
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy. Your privacy is important to us.
> Accenture uses your personal data only in compliance with data protection
> laws. For further information on how Accenture processes your personal
> data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
> > ______________________________________________________________________
> > ________________
> >
> > http://www.accenture.com
>

Reply via email to