Re: [External] Re: Source vs SourceFunction and testing

2022-06-09 Thread Jing Ge
Hi Carlos,

You might want to join the discussion about FLIP-238[1] to share your
thoughts with us. Thanks!

Best regards,
Jing

[1] https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt


On Thu, Jun 9, 2022 at 2:13 PM Sanabria, Carlos <
carlos.sanab...@accenture.com> wrote:

> Thanks for your quick response!
>
> Yes, this is exactly what we were looking for!
> Seems like a really nice feature. Even better than the FromElementsSource
> we were asking for, because it allows to generate the events dynamically.
>
> Is there any way we can vote for the FLIP-238 to be accepted?
>
> -Original Message-
> From: Qingsheng Ren 
> Sent: jueves, 9 de junio de 2022 12:16
> To: Sanabria, Carlos 
> Cc: user 
> Subject: Re: [External] Re: Source vs SourceFunction and testing
>
> Hi Carlos,
>
> FLIP-238 [1] is proposing a FLIP-27-based data generator source and I
> think this is what you are looking for. This FLIP was created just days ago
> so it may take some time to get accepted and released.
>
> [1]
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D238-253A-2BIntroduce-2BFLIP-2D27-2Dbased-2BData-2BGenerator-2BSource=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3q1F=ZRTHrrDGp1m0Po50VeAFjAEQBjCM28naJRNWM4CZQoA=
>
> Cheers,
>
> Qingsheng
>
> On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos <
> carlos.sanab...@accenture.com> wrote:
> >
> > Hi everyone!
> >
> > Sorry for reopening the thread, but I am having some problems related to
> this case while migrating our code from Flink 1.12 to Flink 1.15.
> >
> > We have a base project that encapsulates a ton of common code and
> configurations. One of the abstractions we have is an AbstractDataStreamJob
> class that has generic Sources and Sinks. We implemented it like this since
> Flink 1.8, following the recommendations of the Flink documentation [1]:
> >
> > "Apache Flink provides a JUnit rule called MiniClusterWithClientResource
> for testing complete jobs against a local, embedded mini cluster. called
> MiniClusterWithClientResource.
> > ...
> > A few remarks on integration testing with MiniClusterWithClientResource:
> > - In order not to copy your whole pipeline code from production to test,
> make sources and sinks pluggable in your production code and inject special
> test sources and test sinks in your tests.
> > ..."
> >
> > This way, we can create the real Kafka Sources and Sinks in the Main
> class of the job, and also create the test Sources and Sinks in the Junit
> tests, and inject them in the AbstractDataStreamJob class.
> >
> > The problem comes with the new Source interface and the end to end tests
> against the local embedded mini cluster. Prior to Flink 1.15, we used the
> FromElementsFunction to create the test SourceFunction. Now that we changed
> the code to use the new Source interface, we cannot use the
> FromElementsFunction anymore, and we haven't found an equivalent
> FromElementsSource class with the same functionality but implemented using
> the new Source API.
> >
> > We want to keep the same structure in the AbstractDataStreamJob class
> (with generic and pluggable sources and sinks), as we think it is the most
> elegant and generic solution.
> >
> > Is it planned to implement a FromElementsSource class that extends the
> new Source API? Is there any other alternative that may serve as a
> workaround for the moment?
> >
> > We have tried to implement a custom Source for this use case, but it
> seems like an overwhelming task and we do not want to reinvent the wheel
> either. If it is planned to implement the FromElementsSource we'd rather
> prefer to wait for it.
> >
> > Thanks!
> > Carlos
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.
> > org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_testing_-2
> > 3junit-2Drule-2Dminiclusterwithclientresource=DwIFaQ=eIGjsITfXP_y-
> > DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPB
> > qIxifg=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3
> > q1F=RKTpSSHRudC_BMmTz9xhGOT91uAAbp7HPEejuTihHvU=
> >
> > -Original Message-
> > From: Qingsheng Ren 
> > Sent: miércoles, 25 de mayo de 2022 12:10
> > To: Piotr Domagalski 
> > Cc: user@flink.apache.org
> > Subject: [External] Re: Source vs SourceFunction and testing
> >
> > This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with
> 

RE: [External] Re: Source vs SourceFunction and testing

2022-06-09 Thread Sanabria, Carlos
Thanks for your quick response!

Yes, this is exactly what we were looking for!
Seems like a really nice feature. Even better than the FromElementsSource we 
were asking for, because it allows to generate the events dynamically.

Is there any way we can vote for the FLIP-238 to be accepted?

-Original Message-
From: Qingsheng Ren  
Sent: jueves, 9 de junio de 2022 12:16
To: Sanabria, Carlos 
Cc: user 
Subject: Re: [External] Re: Source vs SourceFunction and testing

Hi Carlos,

FLIP-238 [1] is proposing a FLIP-27-based data generator source and I think 
this is what you are looking for. This FLIP was created just days ago so it may 
take some time to get accepted and released.

[1] 
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D238-253A-2BIntroduce-2BFLIP-2D27-2Dbased-2BData-2BGenerator-2BSource=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3q1F=ZRTHrrDGp1m0Po50VeAFjAEQBjCM28naJRNWM4CZQoA=
 

Cheers,

Qingsheng

On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos  
wrote:
>
> Hi everyone!
>
> Sorry for reopening the thread, but I am having some problems related to this 
> case while migrating our code from Flink 1.12 to Flink 1.15.
>
> We have a base project that encapsulates a ton of common code and 
> configurations. One of the abstractions we have is an AbstractDataStreamJob 
> class that has generic Sources and Sinks. We implemented it like this since 
> Flink 1.8, following the recommendations of the Flink documentation [1]:
>
> "Apache Flink provides a JUnit rule called MiniClusterWithClientResource for 
> testing complete jobs against a local, embedded mini cluster. called 
> MiniClusterWithClientResource.
> ...
> A few remarks on integration testing with MiniClusterWithClientResource:
> - In order not to copy your whole pipeline code from production to test, make 
> sources and sinks pluggable in your production code and inject special test 
> sources and test sinks in your tests.
> ..."
>
> This way, we can create the real Kafka Sources and Sinks in the Main class of 
> the job, and also create the test Sources and Sinks in the Junit tests, and 
> inject them in the AbstractDataStreamJob class.
>
> The problem comes with the new Source interface and the end to end tests 
> against the local embedded mini cluster. Prior to Flink 1.15, we used the 
> FromElementsFunction to create the test SourceFunction. Now that we changed 
> the code to use the new Source interface, we cannot use the 
> FromElementsFunction anymore, and we haven't found an equivalent 
> FromElementsSource class with the same functionality but implemented using 
> the new Source API.
>
> We want to keep the same structure in the AbstractDataStreamJob class (with 
> generic and pluggable sources and sinks), as we think it is the most elegant 
> and generic solution.
>
> Is it planned to implement a FromElementsSource class that extends the new 
> Source API? Is there any other alternative that may serve as a workaround for 
> the moment?
>
> We have tried to implement a custom Source for this use case, but it seems 
> like an overwhelming task and we do not want to reinvent the wheel either. If 
> it is planned to implement the FromElementsSource we'd rather prefer to wait 
> for it.
>
> Thanks!
> Carlos
>
> [1] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.
> org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_testing_-2
> 3junit-2Drule-2Dminiclusterwithclientresource=DwIFaQ=eIGjsITfXP_y-
> DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPB
> qIxifg=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3
> q1F=RKTpSSHRudC_BMmTz9xhGOT91uAAbp7HPEejuTihHvU=
>
> -Original Message-
> From: Qingsheng Ren 
> Sent: miércoles, 25 de mayo de 2022 12:10
> To: Piotr Domagalski 
> Cc: user@flink.apache.org
> Subject: [External] Re: Source vs SourceFunction and testing
>
> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with 
> links and attachments.
>
> Glad to see you have resolved the issue!
>
> If you want to learn more about the Source API, the Flink document [1] has a 
> detailed description about it. The original proposal FLIP-27 [2] is also a 
> good reference.
>
> [1] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.
> org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_
> =DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCz
> xlSpEZgTNbdEC4jbNL0iaPBqIxifg=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50
> aRVltFNinifOKvurHPTzdPL1da=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1
> W8= [2

Re: [External] Re: Source vs SourceFunction and testing

2022-06-09 Thread Qingsheng Ren
Hi Carlos,

FLIP-238 [1] is proposing a FLIP-27-based data generator source and I
think this is what you are looking for. This FLIP was created just
days ago so it may take some time to get accepted and released.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source

Cheers,

Qingsheng

On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos
 wrote:
>
> Hi everyone!
>
> Sorry for reopening the thread, but I am having some problems related to this 
> case while migrating our code from Flink 1.12 to Flink 1.15.
>
> We have a base project that encapsulates a ton of common code and 
> configurations. One of the abstractions we have is an AbstractDataStreamJob 
> class that has generic Sources and Sinks. We implemented it like this since 
> Flink 1.8, following the recommendations of the Flink documentation [1]:
>
> "Apache Flink provides a JUnit rule called MiniClusterWithClientResource for 
> testing complete jobs against a local, embedded mini cluster. called 
> MiniClusterWithClientResource.
> ...
> A few remarks on integration testing with MiniClusterWithClientResource:
> - In order not to copy your whole pipeline code from production to test, make 
> sources and sinks pluggable in your production code and inject special test 
> sources and test sinks in your tests.
> ..."
>
> This way, we can create the real Kafka Sources and Sinks in the Main class of 
> the job, and also create the test Sources and Sinks in the Junit tests, and 
> inject them in the AbstractDataStreamJob class.
>
> The problem comes with the new Source interface and the end to end tests 
> against the local embedded mini cluster. Prior to Flink 1.15, we used the 
> FromElementsFunction to create the test SourceFunction. Now that we changed 
> the code to use the new Source interface, we cannot use the 
> FromElementsFunction anymore, and we haven't found an equivalent 
> FromElementsSource class with the same functionality but implemented using 
> the new Source API.
>
> We want to keep the same structure in the AbstractDataStreamJob class (with 
> generic and pluggable sources and sinks), as we think it is the most elegant 
> and generic solution.
>
> Is it planned to implement a FromElementsSource class that extends the new 
> Source API? Is there any other alternative that may serve as a workaround for 
> the moment?
>
> We have tried to implement a custom Source for this use case, but it seems 
> like an overwhelming task and we do not want to reinvent the wheel either. If 
> it is planned to implement the FromElementsSource we'd rather prefer to wait 
> for it.
>
> Thanks!
> Carlos
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource
>
> -Original Message-
> From: Qingsheng Ren 
> Sent: miércoles, 25 de mayo de 2022 12:10
> To: Piotr Domagalski 
> Cc: user@flink.apache.org
> Subject: [External] Re: Source vs SourceFunction and testing
>
> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with 
> links and attachments.
>
> Glad to see you have resolved the issue!
>
> If you want to learn more about the Source API, the Flink document [1] has a 
> detailed description about it. The original proposal FLIP-27 [2] is also a 
> good reference.
>
> [1] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1W8=
> [2] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterface=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5V3MIQ=
>
> Cheers,
>
> Qingsheng
>
> > On May 25, 2022, at 17:54, Piotr Domagalski  wrote:
> >
> > Thank you Qingsheng, this context helps a lot!
> >
> > And once again thank you all for being such a helpful community!
> >
> > P.S. I actually struggled for a bit trying to understand why my refactored 
> > solution which accepts DataStream<> wouldn't work ("no operators defined in 
> > the streaming topology"). Turns out, my assumption that I can call 
> > StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get 
> > the same environment, was wrong. I had env.addSource and env.fromSource 
> > calls us

RE: [External] Re: Source vs SourceFunction and testing

2022-06-09 Thread Sanabria, Carlos
Hi everyone!

Sorry for reopening the thread, but I am having some problems related to this 
case while migrating our code from Flink 1.12 to Flink 1.15.

We have a base project that encapsulates a ton of common code and 
configurations. One of the abstractions we have is an AbstractDataStreamJob 
class that has generic Sources and Sinks. We implemented it like this since 
Flink 1.8, following the recommendations of the Flink documentation [1]:

"Apache Flink provides a JUnit rule called MiniClusterWithClientResource for 
testing complete jobs against a local, embedded mini cluster. called 
MiniClusterWithClientResource.
...
A few remarks on integration testing with MiniClusterWithClientResource:
- In order not to copy your whole pipeline code from production to test, make 
sources and sinks pluggable in your production code and inject special test 
sources and test sinks in your tests.
..."

This way, we can create the real Kafka Sources and Sinks in the Main class of 
the job, and also create the test Sources and Sinks in the Junit tests, and 
inject them in the AbstractDataStreamJob class.

The problem comes with the new Source interface and the end to end tests 
against the local embedded mini cluster. Prior to Flink 1.15, we used the 
FromElementsFunction to create the test SourceFunction. Now that we changed the 
code to use the new Source interface, we cannot use the FromElementsFunction 
anymore, and we haven't found an equivalent FromElementsSource class with the 
same functionality but implemented using the new Source API.

We want to keep the same structure in the AbstractDataStreamJob class (with 
generic and pluggable sources and sinks), as we think it is the most elegant 
and generic solution.

Is it planned to implement a FromElementsSource class that extends the new 
Source API? Is there any other alternative that may serve as a workaround for 
the moment?

We have tried to implement a custom Source for this use case, but it seems like 
an overwhelming task and we do not want to reinvent the wheel either. If it is 
planned to implement the FromElementsSource we'd rather prefer to wait for it.

Thanks!
Carlos

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource

-Original Message-
From: Qingsheng Ren 
Sent: miércoles, 25 de mayo de 2022 12:10
To: Piotr Domagalski 
Cc: user@flink.apache.org
Subject: [External] Re: Source vs SourceFunction and testing

This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links 
and attachments.

Glad to see you have resolved the issue!

If you want to learn more about the Source API, the Flink document [1] has a 
detailed description about it. The original proposal FLIP-27 [2] is also a good 
reference.

[1] 
https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1W8=
[2] 
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterface=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5V3MIQ=

Cheers,

Qingsheng

> On May 25, 2022, at 17:54, Piotr Domagalski  wrote:
>
> Thank you Qingsheng, this context helps a lot!
>
> And once again thank you all for being such a helpful community!
>
> P.S. I actually struggled for a bit trying to understand why my refactored 
> solution which accepts DataStream<> wouldn't work ("no operators defined in 
> the streaming topology"). Turns out, my assumption that I can call 
> StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get 
> the same environment, was wrong. I had env.addSource and env.fromSource calls 
> using one instance of the environment, but then called env.execute() on 
> another instance :facepalm:
>
> On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren  wrote:
> Hi Piotr,
>
> I’d like to share my understanding about this. Source and SourceFunction are 
> both interfaces to data sources. SourceFunction was designed and introduced 
> earlier and as the project evolved, many shortcomings emerged. Therefore, the 
> community re-designed the source interface and introduced the new Source API 
> in FLIP-27 [1].
>
> Finally we will deprecate the SourceFunction and use Source as the only 
> interface for all data sources, but considering the huge cost of migration 
> you’ll see SourceFunction and Source co-exist for some time, like the 
> ParallelTestSource you mentioned is still on Sour

Re: SourceFunction

2022-06-08 Thread Jing Ge
Hi Alexey,

There is a thread[1] discussing this issue right now. It would be great if
you could share some thoughts about your experience. Thanks!

Best regards,
Jing

[1]https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9

On Wed, Jun 8, 2022 at 4:42 PM Alexey Trenikhun  wrote:

> Hello,
> Is there plan to deprecate SourceFunction in favor of Source API? We have
> custom SourceFunction based source,  do we need to plan to rewrite it using
> new Source API ?
>
> Thanks,
> Alexey
>


SourceFunction

2022-06-08 Thread Alexey Trenikhun
Hello,
Is there plan to deprecate SourceFunction in favor of Source API? We have 
custom SourceFunction based source,  do we need to plan to rewrite it using new 
Source API ?

Thanks,
Alexey


Re: Source vs SourceFunction and testing

2022-05-25 Thread Qingsheng Ren
Glad to see you have resolved the issue! 

If you want to learn more about the Source API, the Flink document [1] has a 
detailed description about it. The original proposal FLIP-27 [2] is also a good 
reference. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/sources/
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

Cheers, 

Qingsheng

> On May 25, 2022, at 17:54, Piotr Domagalski  wrote:
> 
> Thank you Qingsheng, this context helps a lot!
> 
> And once again thank you all for being such a helpful community!
> 
> P.S. I actually struggled for a bit trying to understand why my refactored 
> solution which accepts DataStream<> wouldn't work ("no operators defined in 
> the streaming topology"). Turns out, my assumption that I can call 
> StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get 
> the same environment, was wrong. I had env.addSource and env.fromSource calls 
> using one instance of the environment, but then called env.execute() on 
> another instance :facepalm:
> 
> On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren  wrote:
> Hi Piotr,
> 
> I’d like to share my understanding about this. Source and SourceFunction are 
> both interfaces to data sources. SourceFunction was designed and introduced 
> earlier and as the project evolved, many shortcomings emerged. Therefore, the 
> community re-designed the source interface and introduced the new Source API 
> in FLIP-27 [1]. 
> 
> Finally we will deprecate the SourceFunction and use Source as the only 
> interface for all data sources, but considering the huge cost of migration 
> you’ll see SourceFunction and Source co-exist for some time, like the 
> ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource 
> as a pioneer has already migrated to the new Source API.
> 
> I think the API to end users didn't change a lot: both 
> env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, 
> and you could apply downstream transformations onto it. 
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>  
> 
> Cheers,
> 
> Qingsheng
> 
> > On May 25, 2022, at 03:19, Piotr Domagalski  wrote:
> > 
> > Hi Ken,
> > 
> > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> > navigating the type system and being still confused about differences 
> > between Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> > 
> > I think the DataStream<> type is what I'm looking for? That is, then I can 
> > use:
> > 
> > DataStream source = env.fromSource(getKafkaSource(params), 
> > watermarkStrategy, "Kafka");
> > when using KafkaSource in the normal setup
> > 
> > and
> > DataStream s = env.addSource(new ParallelTestSource<>(...));
> > when using the testing source [1]
> > 
> > Does that sound right?
> > 
> > [1] 
> > https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> > 
> > On Tue, May 24, 2022 at 7:57 PM Ken Krugler  
> > wrote:
> > Hi Piotr,
> > 
> > The way I handle this is via a workflow class that uses a builder approach 
> > to specifying inputs, outputs, and any other configuration settings.
> > 
> > The inputs are typically DataStream.
> > 
> > This way I can separate out the Kafka inputs, and use testing sources that 
> > give me very precise control over the inputs (e.g. I can hold up on right 
> > side data to ensure my stateful left join junction is handling deferred 
> > joins properly). I can also use Kafka unit test support (either kafka-junit 
> > or Spring embedded Kafka) if needed.
> > 
> > Then in the actual tool class (with a main method) I’ll wire up the real 
> > Kafka sources, with whatever logic is required to convert the consumer 
> > records to what the workflow is expecting.
> > 
> > — Ken
> > 
> >> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
> >> 
> >> Hi,
> >> 
> >> I'm wondering: what ithe recommended way to structure the job which one 
> >> would like to test later on with `MiniCluster`.
> >> 
> >> I've looked at the flink-training repository examples [1] and they tend to 
> >> expose the main job as a class that accepts a `SourceFunction` and a 
> >> `SinkFunction`, which make sense. But then, my job is normally constructed 
> >> with `KafkaSource` which is then passed to `env.fromSource(.

Re: Source vs SourceFunction and testing

2022-05-25 Thread Piotr Domagalski
Thank you Qingsheng, this context helps a lot!

And once again thank you all for being such a helpful community!

P.S. I actually struggled for a bit trying to understand why my refactored
solution which accepts DataStream<> wouldn't work ("no operators defined in
the streaming topology"). Turns out, my assumption that I can
call StreamExecutionEnvironment.getExecutionEnvironment() multiple times
and get the same environment, was wrong. I had env.addSource and
env.fromSource calls using one instance of the environment, but then called
env.execute() on another instance :facepalm:

On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren  wrote:

> Hi Piotr,
>
> I’d like to share my understanding about this. Source and SourceFunction
> are both interfaces to data sources. SourceFunction was designed and
> introduced earlier and as the project evolved, many shortcomings emerged.
> Therefore, the community re-designed the source interface and introduced
> the new Source API in FLIP-27 [1].
>
> Finally we will deprecate the SourceFunction and use Source as the only
> interface for all data sources, but considering the huge cost of migration
> you’ll see SourceFunction and Source co-exist for some time, like the
> ParallelTestSource you mentioned is still on SourceFunction, and
> KafkaSource as a pioneer has already migrated to the new Source API.
>
> I think the API to end users didn't change a lot: both
> env.addSource(SourceFunction) and env.fromSource(Source) return a
> DataStream, and you could apply downstream transformations onto it.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
> Cheers,
>
> Qingsheng
>
> > On May 25, 2022, at 03:19, Piotr Domagalski 
> wrote:
> >
> > Hi Ken,
> >
> > Thanks Ken. I guess the problem I had was, as a complete newbie to
> Flink, navigating the type system and being still confused about
> differences between Source, SourceFunction, DataStream, DataStreamOperator,
> etc.
> >
> > I think the DataStream<> type is what I'm looking for? That is, then I
> can use:
> >
> > DataStream source = env.fromSource(getKafkaSource(params),
> watermarkStrategy, "Kafka");
> > when using KafkaSource in the normal setup
> >
> > and
> > DataStream s = env.addSource(new ParallelTestSource<>(...));
> > when using the testing source [1]
> >
> > Does that sound right?
> >
> > [1]
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> >
> > On Tue, May 24, 2022 at 7:57 PM Ken Krugler 
> wrote:
> > Hi Piotr,
> >
> > The way I handle this is via a workflow class that uses a builder
> approach to specifying inputs, outputs, and any other configuration
> settings.
> >
> > The inputs are typically DataStream.
> >
> > This way I can separate out the Kafka inputs, and use testing sources
> that give me very precise control over the inputs (e.g. I can hold up on
> right side data to ensure my stateful left join junction is handling
> deferred joins properly). I can also use Kafka unit test support (either
> kafka-junit or Spring embedded Kafka) if needed.
> >
> > Then in the actual tool class (with a main method) I’ll wire up the real
> Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
> >
> > — Ken
> >
> >> On May 24, 2022, at 8:34 AM, Piotr Domagalski 
> wrote:
> >>
> >> Hi,
> >>
> >> I'm wondering: what ithe recommended way to structure the job which one
> would like to test later on with `MiniCluster`.
> >>
> >> I've looked at the flink-training repository examples [1] and they tend
> to expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fromSource(...`.
> >>
> >> Is there any recommended way of handling these discrepancies, ie.
> having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> >>
> >> [1]
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
> >>
> >> --
> >> Piotr Domagalski
> >
> > --
> > Ken Krugler
> > http://www.scaleunlimited.com
> > Custom big data solutions
> > Flink, Pinot, Solr, Elasticsearch
> >
> >
> >
> >
> >
> > --
> > Piotr Domagalski
>
>

-- 
Piotr Domagalski


Re: Source vs SourceFunction and testing

2022-05-24 Thread Qingsheng Ren
Hi Piotr,

I’d like to share my understanding about this. Source and SourceFunction are 
both interfaces to data sources. SourceFunction was designed and introduced 
earlier and as the project evolved, many shortcomings emerged. Therefore, the 
community re-designed the source interface and introduced the new Source API in 
FLIP-27 [1]. 

Finally we will deprecate the SourceFunction and use Source as the only 
interface for all data sources, but considering the huge cost of migration 
you’ll see SourceFunction and Source co-exist for some time, like the 
ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource as 
a pioneer has already migrated to the new Source API.

I think the API to end users didn't change a lot: both 
env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, 
and you could apply downstream transformations onto it. 

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 

Cheers,

Qingsheng

> On May 25, 2022, at 03:19, Piotr Domagalski  wrote:
> 
> Hi Ken,
> 
> Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> navigating the type system and being still confused about differences between 
> Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> 
> I think the DataStream<> type is what I'm looking for? That is, then I can 
> use:
> 
> DataStream source = env.fromSource(getKafkaSource(params), 
> watermarkStrategy, "Kafka");
> when using KafkaSource in the normal setup
> 
> and
> DataStream s = env.addSource(new ParallelTestSource<>(...));
> when using the testing source [1]
> 
> Does that sound right?
> 
> [1] 
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> 
> On Tue, May 24, 2022 at 7:57 PM Ken Krugler  
> wrote:
> Hi Piotr,
> 
> The way I handle this is via a workflow class that uses a builder approach to 
> specifying inputs, outputs, and any other configuration settings.
> 
> The inputs are typically DataStream.
> 
> This way I can separate out the Kafka inputs, and use testing sources that 
> give me very precise control over the inputs (e.g. I can hold up on right 
> side data to ensure my stateful left join junction is handling deferred joins 
> properly). I can also use Kafka unit test support (either kafka-junit or 
> Spring embedded Kafka) if needed.
> 
> Then in the actual tool class (with a main method) I’ll wire up the real 
> Kafka sources, with whatever logic is required to convert the consumer 
> records to what the workflow is expecting.
> 
> — Ken
> 
>> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
>> 
>> Hi,
>> 
>> I'm wondering: what ithe recommended way to structure the job which one 
>> would like to test later on with `MiniCluster`.
>> 
>> I've looked at the flink-training repository examples [1] and they tend to 
>> expose the main job as a class that accepts a `SourceFunction` and a 
>> `SinkFunction`, which make sense. But then, my job is normally constructed 
>> with `KafkaSource` which is then passed to `env.fromSource(...`.
>> 
>> Is there any recommended way of handling these discrepancies, ie. having to 
>> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>> 
>> [1] 
>> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>> 
>> -- 
>> Piotr Domagalski
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 
> 
> -- 
> Piotr Domagalski



Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
Hi Piotr,

Yes, that should work (using DataStream as the common result of both 
source creation options)

— Ken

> On May 24, 2022, at 12:19 PM, Piotr Domagalski  wrote:
> 
> Hi Ken,
> 
> Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> navigating the type system and being still confused about differences between 
> Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> 
> I think the DataStream<> type is what I'm looking for? That is, then I can 
> use:
> 
> DataStream source = env.fromSource(getKafkaSource(params), 
> watermarkStrategy, "Kafka");
> when using KafkaSource in the normal setup
> 
> and
> DataStream s = env.addSource(new ParallelTestSource<>(...));
> when using the testing source [1]
> 
> Does that sound right?
> 
> [1] 
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
>  
> <https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26>
> On Tue, May 24, 2022 at 7:57 PM Ken Krugler  <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Piotr,
> 
> The way I handle this is via a workflow class that uses a builder approach to 
> specifying inputs, outputs, and any other configuration settings.
> 
> The inputs are typically DataStream.
> 
> This way I can separate out the Kafka inputs, and use testing sources that 
> give me very precise control over the inputs (e.g. I can hold up on right 
> side data to ensure my stateful left join junction is handling deferred joins 
> properly). I can also use Kafka unit test support (either kafka-junit or 
> Spring embedded Kafka) if needed.
> 
> Then in the actual tool class (with a main method) I’ll wire up the real 
> Kafka sources, with whatever logic is required to convert the consumer 
> records to what the workflow is expecting.
> 
> — Ken
> 
>> On May 24, 2022, at 8:34 AM, Piotr Domagalski > <mailto:pi...@domagalski.com>> wrote:
>> 
>> Hi,
>> 
>> I'm wondering: what ithe recommended way to structure the job which one 
>> would like to test later on with `MiniCluster`.
>> 
>> I've looked at the flink-training repository examples [1] and they tend to 
>> expose the main job as a class that accepts a `SourceFunction` and a 
>> `SinkFunction`, which make sense. But then, my job is normally constructed 
>> with `KafkaSource` which is then passed to `env.fromSource(...`.
>> 
>> Is there any recommended way of handling these discrepancies, ie. having to 
>> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>> 
>> [1] 
>> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>>  
>> <https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61>
>> 
>> -- 
>> Piotr Domagalski
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi Ken,

Thanks Ken. I guess the problem I had was, as a complete newbie to Flink,
navigating the type system and being still confused about differences
between Source, SourceFunction, DataStream, DataStreamOperator, etc.

I think the DataStream<> type is what I'm looking for? That is, then I can
use:

DataStream source = env.fromSource(getKafkaSource(params),
watermarkStrategy, "Kafka");
when using KafkaSource in the normal setup

and
DataStream s = env.addSource(new ParallelTestSource<>(...));
when using the testing source [1]

Does that sound right?

[1]
https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26

On Tue, May 24, 2022 at 7:57 PM Ken Krugler 
wrote:

> Hi Piotr,
>
> The way I handle this is via a workflow class that uses a builder approach
> to specifying inputs, outputs, and any other configuration settings.
>
> The inputs are typically DataStream.
>
> This way I can separate out the Kafka inputs, and use testing sources that
> give me very precise control over the inputs (e.g. I can hold up on right
> side data to ensure my stateful left join junction is handling deferred
> joins properly). I can also use Kafka unit test support (either kafka-junit
> or Spring embedded Kafka) if needed.
>
> Then in the actual tool class (with a main method) I’ll wire up the real
> Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
>
> — Ken
>
> On May 24, 2022, at 8:34 AM, Piotr Domagalski 
> wrote:
>
> Hi,
>
> I'm wondering: what ithe recommended way to structure the job which one
> would like to test later on with `MiniCluster`.
>
> I've looked at the flink-training repository examples [1] and they tend to
> expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fromSource(...`.
>
> Is there any recommended way of handling these discrepancies, ie. having
> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>
> [1]
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>
> --
> Piotr Domagalski
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

-- 
Piotr Domagalski


Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
Hi Piotr,

The way I handle this is via a workflow class that uses a builder approach to 
specifying inputs, outputs, and any other configuration settings.

The inputs are typically DataStream.

This way I can separate out the Kafka inputs, and use testing sources that give 
me very precise control over the inputs (e.g. I can hold up on right side data 
to ensure my stateful left join junction is handling deferred joins properly). 
I can also use Kafka unit test support (either kafka-junit or Spring embedded 
Kafka) if needed.

Then in the actual tool class (with a main method) I’ll wire up the real Kafka 
sources, with whatever logic is required to convert the consumer records to 
what the workflow is expecting.

— Ken

> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
> 
> Hi,
> 
> I'm wondering: what ithe recommended way to structure the job which one would 
> like to test later on with `MiniCluster`.
> 
> I've looked at the flink-training repository examples [1] and they tend to 
> expose the main job as a class that accepts a `SourceFunction` and a 
> `SinkFunction`, which make sense. But then, my job is normally constructed 
> with `KafkaSource` which is then passed to `env.fromSource(...`.
> 
> Is there any recommended way of handling these discrepancies, ie. having to 
> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> 
> [1] 
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>  
> <https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61>
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Source vs SourceFunction and testing

2022-05-24 Thread Aeden Jameson
Depending on the kind of testing you're hoping to do you may want to
look into https://github.com/mguenther/kafka-junit. For example,
you're looking for some job level smoke tests that just answer the
question "Is everything wired up correctly?"  Personally, I like how
this approach doesn't require you to open up the design for the sake
of testing.


On Tue, May 24, 2022 at 8:34 AM Piotr Domagalski  wrote:
>
> Hi,
>
> I'm wondering: what ithe recommended way to structure the job which one would 
> like to test later on with `MiniCluster`.
>
> I've looked at the flink-training repository examples [1] and they tend to 
> expose the main job as a class that accepts a `SourceFunction` and a 
> `SinkFunction`, which make sense. But then, my job is normally constructed 
> with `KafkaSource` which is then passed to `env.fromSource(...`.
>
> Is there any recommended way of handling these discrepancies, ie. having to 
> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>
> [1] 
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>
> --
> Piotr Domagalski



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj


Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi,

I'm wondering: what ithe recommended way to structure the job which one
would like to test later on with `MiniCluster`.

I've looked at the flink-training repository examples [1] and they tend to
expose the main job as a class that accepts a `SourceFunction` and a
`SinkFunction`, which make sense. But then, my job is normally constructed
with `KafkaSource` which is then passed to `env.fromSource(...`.

Is there any recommended way of handling these discrepancies, ie. having to
use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?

[1]
https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61

-- 
Piotr Domagalski


Re: How to access Task.isBackPressured() from a SourceFunction?

2022-02-14 Thread Niklas Semmler
Hi Darren,

No, you cannot access the Task from the operator. You can access some metrics 
via the RuntimeContext.

getRuntimeContext().getMetricGroup() 

How does the backpressure help you here? Backpressure can originate in any 
operator or network connection. If it's an operator further downstream, it may 
take some time till it arrives at the source. What do you want to do in 
response? Switch to a slower source and then switch back?

Best regards,
Niklas

> On 9. Feb 2022, at 07:01, Darren Whobrey  wrote:
> 
> Hi, is there a way for the UDF of a source function, extended from 
> RichParallelSourceFunction, to access its Task instance, so as to call 
> Task.isBackPressured()?
> I’m trying to give priorities to different input sources that need to be 
> managed from within the same source function and want to stop reading from 
> one source according to a utilization statistic that is partially based on 
> whether there is backpressure. 
> Thank you!



How to access Task.isBackPressured() from a SourceFunction?

2022-02-08 Thread Darren Whobrey
Hi, is there a way for the UDF of a source function, extended from 
RichParallelSourceFunction, to access its Task instance, so as to call 
Task.isBackPressured()?
I'm trying to give priorities to different input sources that need to be 
managed from within the same source function and want to stop reading from one 
source according to a utilization statistic that is partially based on whether 
there is backpressure.
Thank you!


Re: SourceFunction cannot run in Batch Mode

2021-06-02 Thread Ingo Bürk
Hi Oscar,

I think you'll find your answers in [1], have a look at Yun's response a
couple emails down. Basically, SourceFunction is the legacy source stack,
and ideally you'd instead implement your source using the FLIP-27 stack[2]
where you can directly define the boundedness, but he also mentioned a
workaround.


Regards
Ingo

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-Kafka-as-bounded-source-with-DataStream-API-in-batch-mode-Flink-1-12-td40637.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/sources/#the-data-source-api

On Thu, Jun 3, 2021 at 7:29 AM 陳樺威  wrote:

> Hi,
>
> Currently, we want to use batch execution mode [0] to consume historical
> data and rebuild states for our streaming application.
> The Flink app will be run on-demand and close after complete all the file
> processing.
> We implement a SourceFuntion [1] to consume bounded parquet files from
> GCS. However, the function will be detected as Batch Mode.
>
> Our question is, how to implement a SourceFunction as a Bounded DataStream?
>
> Thanks!
> Oscar
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
>
>
>
>


Re: SourceFunction cannot run in Batch Mode

2021-06-02 Thread 陳樺威
Sorry, there are some typos that may be misleading.

The SourceFunction will be detected as* Streaming Mode.*

陳樺威  於 2021年6月3日 週四 下午1:29寫道:

> Hi,
>
> Currently, we want to use batch execution mode [0] to consume historical
> data and rebuild states for our streaming application.
> The Flink app will be run on-demand and close after complete all the file
> processing.
> We implement a SourceFuntion [1] to consume bounded parquet files from
> GCS. However, the function will be detected as Batch Mode.
>
> Our question is, how to implement a SourceFunction as a Bounded DataStream?
>
> Thanks!
> Oscar
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
>
>
>
>


SourceFunction cannot run in Batch Mode

2021-06-02 Thread 陳樺威
Hi,

Currently, we want to use batch execution mode [0] to consume historical
data and rebuild states for our streaming application.
The Flink app will be run on-demand and close after complete all the file
processing.
We implement a SourceFuntion [1] to consume bounded parquet files from GCS.
However, the function will be detected as Batch Mode.

Our question is, how to implement a SourceFunction as a Bounded DataStream?

Thanks!
Oscar

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html


Re: Call run() of another SourceFunction inside run()?

2021-04-14 Thread Piotr Nowojski
Hi,

I think it should be working. At least from the top of my head I do not see
any reason why it shouldn't be working.

Just make sure that you are proxying all relevant methods, not only those
defined in `SourceFunction`. For example `FlinkKafkaConsumer` is
implementing/extending: `RichParallelSourceFunction`,
`CheckpointListener`, `CheckpointedFunction` and  `ResultTypeQueryable`,
so if you want to wrap `FlinkKafkaConsumer`, you would need to proxy all of
those interfaces/calls from your `WrappingSourceFunction` to the
`innerSourceFunction`.

Best,
Piotrek

śr., 14 kwi 2021 o 11:36 Schneider, Jochen 
napisał(a):

> Hi!
>
> To work around FLINK-2491
> <https://issues.apache.org/jira/browse/FLINK-2491> which causes
> checkpointing issues for us I am trying to chain SourceFunctions so that
> the first one never quits. The basic idea is as follows:
>
> class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) 
> extends SourceFunction[Outer] {
>
>   override def run(outerCtx: SourceContext[Outer]): Unit = {
>
> outerCtx.collect(...)
>
>
>
>
>
> val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)
>
> innerSourceFunction.run(innerCtx)
>
>   }
>
>
>
>   override def cancel() = innerSourceFunction.cancel()
>
> }
>
> Is it ok to call run() of a different SourceFunction inside of run() and
> implement my own SourceContext delegating to another one? It works for a
> small test running on a local Flink environment, but I am wondering if
> there could be any issues doing that on production.
>
> Thanks,
>
> Jochen
>


Call run() of another SourceFunction inside run()?

2021-04-14 Thread Schneider, Jochen
Hi!

To work around FLINK-2491<https://issues.apache.org/jira/browse/FLINK-2491> 
which causes checkpointing issues for us I am trying to chain SourceFunctions 
so that the first one never quits. The basic idea is as follows:

class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) 
extends SourceFunction[Outer] {

  override def run(outerCtx: SourceContext[Outer]): Unit = {

outerCtx.collect(...)





val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)

innerSourceFunction.run(innerCtx)

  }



  override def cancel() = innerSourceFunction.cancel()

}

Is it ok to call run() of a different SourceFunction inside of run() and 
implement my own SourceContext delegating to another one? It works for a small 
test running on a local Flink environment, but I am wondering if there could be 
any issues doing that on production.

Thanks,

Jochen


Re: Passing a custom SourceContext to a SourceFunction

2019-05-16 Thread Chesnay Schepler

You cannot control what kind of SourceContext is passed into your function.

What are you trying to achieve?

On 15/05/2019 09:30, Debasish Ghosh wrote:

Hi -

I have a custom SourceFunction ..

class MySourceFunction[T](data: Seq[T]) extends SourceFunction[T] {
  def run(ctx: SourceContext[T]): Unit = {
data.foreach(d ⇒ ctx.collect(d))
  }
}

When this function is run during job execution, the SourceContext that 
gets passed serializes the data. I would like to pass a mock 
SourceContext (similar to 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java) 
in the run method. How do I do this ? Note I am not invoking the run 
method explicitly anywhere.


Any help will be appreciated.

regards.

--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg





Passing a custom SourceContext to a SourceFunction

2019-05-15 Thread Debasish Ghosh
Hi -

I have a custom SourceFunction ..

class MySourceFunction[T](data: Seq[T]) extends SourceFunction[T] {
  def run(ctx: SourceContext[T]): Unit = {
data.foreach(d ⇒ ctx.collect(d))
  }
}

When this function is run during job execution, the SourceContext that gets
passed serializes the data. I would like to pass a mock SourceContext
(similar to
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java)
in the run method. How do I do this ? Note I am not invoking the run method
explicitly anywhere.

Any help will be appreciated.

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: Flink Custom SourceFunction and SinkFunction

2019-03-04 Thread Piotr Nowojski
Hi,

I couldn’t find any references to your question neither I haven’t seen such use 
case, but:

Re 1. 
It looks like it could work

Re 2.
It should work as well, but just try to use StreamingFileSink

Re 3.
For custom source/sink function, if you do not care data processing guarantees 
it’s quite easy. If you have to achieve at-least-once or exactly-once things 
might get more complicated. 
For exactly-once sink, you should start from `TwoPhaseCommitSinkFunction`. 
(Example usages check test class 
`TwoPhaseCommitSinkFunctionTest.ContentDumpSinkFunction`, or more complicated 
FlinkKafkaProducer)
For at-least-once sink, you can just flush/sync the output files on 
snapshot/checkpoint.
For source, you would have to manually keep the input offsets on Flink’s state. 

Re 4.

Regarding SFTP support: not that I’m aware of.
Regarding sources/sinks you can try to look at existing source/sinks 
implementations.

Piotrek

> On 1 Mar 2019, at 09:39, Siew Wai Yow  wrote:
> 
> Hi guys,
> 
> I have question regarding to the title that need your expertise,
> 
> I need to build a SFTP SourceFunction, may I know if hadoop SFTPFileSystem 
> suitable?
> I need to build a SFTP SinkFunction as well, may I know if per-defined HDFS 
> rolling file sink accept SFTP connection since SFTP is supported by hadoop 
> file system?
> Any good reference on how to write custom source/sink?
> Any similar code to share?
> Thanks!
> 
> Regards,
> Yow



Flink Custom SourceFunction and SinkFunction

2019-03-01 Thread Siew Wai Yow
Hi guys,

I have question regarding to the title that need your expertise,


  1.  I need to build a SFTP SourceFunction, may I know if hadoop 
SFTPFileSystem suitable?
  2.  I need to build a SFTP SinkFunction as well, may I know if per-defined 
HDFS rolling file sink accept SFTP connection since SFTP is supported by hadoop 
file system?
  3.  Any good reference on how to write custom source/sink?
  4.  Any similar code to share?

Thanks!

Regards,
Yow


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-12 Thread Aaron Levin
Hi Aljoscha,

Thanks! I will look into this.

Best,

Aaron Levin

On Fri, Nov 9, 2018 at 5:01 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I think for this case a model that is similar to how the Streaming File
> Source works should be good. You can have a look at
> ContinuousFileMonitoringFunction and ContinuousFileReaderOperator. The
> idea is that the first emits splits that should be processed and the second
> is responsible for reading those splits. A generic version of that is what
> I'm proposing for the refactoring of our source interface [1] that also
> comes with a prototype implementation [2].
>
> I think something like this should be adaptable to your case. The split
> enumerator would at first only emit file splits downstream, after that it
> would emit Kafka partitions that should be read. The split reader would
> understand both file splits and kafka partitions and can read from both.
> This still has some kinks to be worked out when it comes to watermarks,
> FLIP-27 is not finished.
>
> What do you think?
>
> Best,
> Aljoscha
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 27%3A+Refactor+Source+Interface
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
> [2] https://github.com/aljoscha/flink/commits/refactor-source-interface
>
>
> On 1. Nov 2018, at 16:50, Aaron Levin  wrote:
>
> Hey,
>
> Thanks for reaching out! I'd love to take a step back and find a better
> solution, so I'll try to be succint in what I'm trying to accomplish:
>
> We're trying to write a SourceFunction which:
> * reads some Sequence files from S3 in a particular order (each task gets
> files in a specific order).
> * sends a watermark between each sequence file
> * when that's complete, starts reading from Kafka topics.
> * (This is similar to the bootstrap problem which Lyft has talked about
> (see: https://www.slideshare.net/FlinkForward/flink-forward-
> san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink))
>
> The current solution I have involves a custom InputFormat, InputSplit, and
> SplitAssignor. It achieves most of these requirements, except I have to
> extend InputFormatSourceFunction. I have a class that looks like:
>
> class MySourceFunction(val s3Archives: CustomInputFormat, val kafka:
> KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}
>
> There are lots I don't like about the existing solution:
> * I have to extend InputFormatSourceFunction to ensure the graph is
> initialized properly (the bug I wrote about)
> * I had to replicate most of the implementation of
> InputFormatSourceFunction so I could insert Watermarks between splits.
>
> I'd love any suggestions around improving this!
>
> Best,
>
> Aaron Levin
>
> On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek 
> wrote:
>
>> Hi Aaron,
>>
>> I'l like to take a step back and understand why you're trying to wrap an
>> InputFormatSourceFunction?
>>
>> In my opinion, InputFormatSourceFunction should not be used because it
>> has some shortcomings, the most prominent among them that it does not
>> support checkpointing, i.e. in case of failure all data will (probably) be
>> read again. I'm saying probably because the interaction of
>> InputFormatSourceFunction with how InputSplits are generated (which relates
>> to that code snippet with the cast you found) could be somewhat "spooky"
>> and lead to weird results in some cases.
>>
>> The interface is a remnant of a very early version of the streaming API
>> and should probably be removed soon. I hope we can find a better solution
>> for your problem that fits better with Flink.
>>
>> Best,
>> Aljoscha
>>
>> On 1. Nov 2018, at 15:30, Aaron Levin  wrote:
>>
>> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
>> provide any insight or advice, that would be helpful!
>>
>> Thanks again.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin 
>> wrote:
>>
>>> Hey,
>>>
>>> Not sure how convo threading works on this list, so in case the folks
>>> CC'd missed my other response, here's some more info:
>>>
>>> First, I appreciate everyone's help! Thank you!
>>>
>>> I wrote several wrappers to try and debug this, including one which is
>>> an exact copy of `InputFormatSourceFunction` which also failed. They all
>>> failed with the same error I detail above. I'll post two of them below.
>>> They all extended `RichParallelSourceFunction` and, as far 

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-09 Thread Aljoscha Krettek
Hi,

I think for this case a model that is similar to how the Streaming File Source 
works should be good. You can have a look at ContinuousFileMonitoringFunction 
and ContinuousFileReaderOperator. The idea is that the first emits splits that 
should be processed and the second is responsible for reading those splits. A 
generic version of that is what I'm proposing for the refactoring of our source 
interface [1] that also comes with a prototype implementation [2].

I think something like this should be adaptable to your case. The split 
enumerator would at first only emit file splits downstream, after that it would 
emit Kafka partitions that should be read. The split reader would understand 
both file splits and kafka partitions and can read from both. This still has 
some kinks to be worked out when it comes to watermarks, FLIP-27 is not 
finished.

What do you think?

Best,
Aljoscha

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
[2] https://github.com/aljoscha/flink/commits/refactor-source-interface 
<https://github.com/aljoscha/flink/commits/refactor-source-interface>

> On 1. Nov 2018, at 16:50, Aaron Levin  wrote:
> 
> Hey,
> 
> Thanks for reaching out! I'd love to take a step back and find a better 
> solution, so I'll try to be succint in what I'm trying to accomplish:
> 
> We're trying to write a SourceFunction which:
> * reads some Sequence files from S3 in a particular order (each task gets 
> files in a specific order).
> * sends a watermark between each sequence file 
> * when that's complete, starts reading from Kafka topics.
> * (This is similar to the bootstrap problem which Lyft has talked about (see: 
> https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink
>  
> <https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink>))
>  
> 
> The current solution I have involves a custom InputFormat, InputSplit, and 
> SplitAssignor. It achieves most of these requirements, except I have to 
> extend InputFormatSourceFunction. I have a class that looks like:
> 
> class MySourceFunction(val s3Archives: CustomInputFormat, val kafka: 
> KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}
> 
> There are lots I don't like about the existing solution:
> * I have to extend InputFormatSourceFunction to ensure the graph is 
> initialized properly (the bug I wrote about)
> * I had to replicate most of the implementation of InputFormatSourceFunction 
> so I could insert Watermarks between splits. 
> 
> I'd love any suggestions around improving this!
> 
> Best,
> 
> Aaron Levin
> 
> On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek  <mailto:aljos...@apache.org>> wrote:
> Hi Aaron,
> 
> I'l like to take a step back and understand why you're trying to wrap an 
> InputFormatSourceFunction?
> 
> In my opinion, InputFormatSourceFunction should not be used because it has 
> some shortcomings, the most prominent among them that it does not support 
> checkpointing, i.e. in case of failure all data will (probably) be read 
> again. I'm saying probably because the interaction of 
> InputFormatSourceFunction with how InputSplits are generated (which relates 
> to that code snippet with the cast you found) could be somewhat "spooky" and 
> lead to weird results in some cases.
> 
> The interface is a remnant of a very early version of the streaming API and 
> should probably be removed soon. I hope we can find a better solution for 
> your problem that fits better with Flink.
> 
> Best,
> Aljoscha
> 
>> On 1. Nov 2018, at 15:30, Aaron Levin > <mailto:aaronle...@stripe.com>> wrote:
>> 
>> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can 
>> provide any insight or advice, that would be helpful!
>> 
>> Thanks again.
>> 
>> Best,
>> 
>> Aaron Levin
>> 
>> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin > <mailto:aaronle...@stripe.com>> wrote:
>> Hey,
>> 
>> Not sure how convo threading works on this list, so in case the folks CC'd 
>> missed my other response, here's some more info:
>> 
>> First, I appreciate everyone's help! Thank you! 
>> 
>> I wrote several wrappers to try and debug this, including one which is an 
>> exact copy of `InputFormatSourceFunction` which also failed. They all failed 
>> with the same error I detail above. I'll post two of them below. They all 
>> extended `RichParallelSourceFunction` and, as far as I could tell, were 
>

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-01 Thread Aaron Levin
Hey,

Thanks for reaching out! I'd love to take a step back and find a better
solution, so I'll try to be succint in what I'm trying to accomplish:

We're trying to write a SourceFunction which:
* reads some Sequence files from S3 in a particular order (each task gets
files in a specific order).
* sends a watermark between each sequence file
* when that's complete, starts reading from Kafka topics.
* (This is similar to the bootstrap problem which Lyft has talked about
(see:
https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink
))

The current solution I have involves a custom InputFormat, InputSplit, and
SplitAssignor. It achieves most of these requirements, except I have to
extend InputFormatSourceFunction. I have a class that looks like:

class MySourceFunction(val s3Archives: CustomInputFormat, val kafka:
KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}

There are lots I don't like about the existing solution:
* I have to extend InputFormatSourceFunction to ensure the graph is
initialized properly (the bug I wrote about)
* I had to replicate most of the implementation of
InputFormatSourceFunction so I could insert Watermarks between splits.

I'd love any suggestions around improving this!

Best,

Aaron Levin

On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek 
wrote:

> Hi Aaron,
>
> I'l like to take a step back and understand why you're trying to wrap an
> InputFormatSourceFunction?
>
> In my opinion, InputFormatSourceFunction should not be used because it has
> some shortcomings, the most prominent among them that it does not support
> checkpointing, i.e. in case of failure all data will (probably) be read
> again. I'm saying probably because the interaction of
> InputFormatSourceFunction with how InputSplits are generated (which relates
> to that code snippet with the cast you found) could be somewhat "spooky"
> and lead to weird results in some cases.
>
> The interface is a remnant of a very early version of the streaming API
> and should probably be removed soon. I hope we can find a better solution
> for your problem that fits better with Flink.
>
> Best,
> Aljoscha
>
> On 1. Nov 2018, at 15:30, Aaron Levin  wrote:
>
> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
> provide any insight or advice, that would be helpful!
>
> Thanks again.
>
> Best,
>
> Aaron Levin
>
> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin 
> wrote:
>
>> Hey,
>>
>> Not sure how convo threading works on this list, so in case the folks
>> CC'd missed my other response, here's some more info:
>>
>> First, I appreciate everyone's help! Thank you!
>>
>> I wrote several wrappers to try and debug this, including one which is an
>> exact copy of `InputFormatSourceFunction` which also failed. They all
>> failed with the same error I detail above. I'll post two of them below.
>> They all extended `RichParallelSourceFunction` and, as far as I could tell,
>> were properly initialized (though I may have missed something!).
>> Additionally, for the two below, if I change `extends
>> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
>> I no longer receive the exception. This is what led me to believe the
>> source of the issue was casting and how I found the line of code where the
>> stream graph is given the input format.
>>
>> Quick explanation of the wrappers:
>> 1. `WrappedInputFormat` does a basic wrap around
>> `InputFormatSourceFunction` and delegates all methods to the underlying
>> `InputFormatSourceFunction`
>> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
>> `InputFormatSourceFunction` source.
>> 3. They're being used in a test which looks vaguely like:
>> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
>> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str
>> ing]]))).javaStream).asScala.toSeq`
>>
>> class WrappedInputFormat[A](
>>   inputFormat: InputFormatSourceFunction[A]
>> )(
>>   implicit typeInfo: TypeInformation[A]
>> ) extends RichParallelSourceFunction[A] {
>>
>>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
>> = {
>> inputFormat.run(sourceContext)
>>   }
>>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>> inputFormat.setRuntimeContext(t)
>>   }
>>   override def equals(obj: scala.Any) = {
>> inputFormat.equals(obj)
>>   }
>>   override def hashCode() = { inputFormat.hashCode() }
>>   override def toString = { inputFormat.toString }
>>   override def get

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-01 Thread Aljoscha Krettek
 format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
> }
> 
> var nextElement: A = serializer.createInstance()
> try {
>   while (isRunning) {
> format.open(splitIterator.next())
> while (isRunning && !format.reachedEnd()) {
>   nextElement = format.nextRecord(nextElement)
>   if (nextElement != null) {
> sourceContext.collect(nextElement)
>   } else {
> break
>   }
>   format.close()
>   if (isRunning) {
> isRunning = splitIterator.hasNext
>   }
> }
>   }
> } finally {
> 
>   format.close()
>   if (format.isInstanceOf[RichInputFormat[_,_]]) {
> format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>   }
>   isRunning = false
> }
>   }
> 
>   override def cancel(): Unit = {
> isRunning = false
>   }
> 
>   override def close(): Unit = {
> format.close()
> if(format.isInstanceOf[RichInputFormat[_,_]]) {
>   format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
> }
>   }
> 
>   private def getInputSplits(): Iterator[InputSplit] = {
> new Iterator[InputSplit] {
>   private var nextSplit: InputSplit = _
>   private var exhausted: Boolean = _
> 
>   override def hasNext: Boolean = {
> if(exhausted) { return false }
> if(nextSplit != null) { return true }
> var split: InputSplit = null
> 
> try {
>   split = 
> provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader)
> } catch {
>   case e: InputSplitProviderException =>
> throw new RuntimeException("No InputSplit Provider", e)
> }
> 
> if(split != null) {
>   nextSplit = split
>   true
> } else {
>   exhausted = true
>   false
> }
>   }
> 
>   override def next(): InputSplit = {
> if(nextSplit == null && !hasNext) {
>   throw new NoSuchElementException()
> }
> val tmp: InputSplit = nextSplit
> nextSplit = null
> tmp
>   }
> 
> }
>   }
> }
> 
> 
> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz  <mailto:dwysakow...@apache.org>> wrote:
> Hi Aaron,
> 
> Could you share the code of you custom function?
> 
> I am also adding Aljosha and Kostas to cc, who should be more helpful on that 
> topic.
> 
> Best,
> 
> Dawid
> 
> On 19/10/2018 20:06, Aaron Levin wrote:
>> Hi,
>> 
>> I'm writing a custom `SourceFunction` which wraps an underlying 
>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a 
>> stream (via `env.addSource` and a subsequent sink) I get errors related to 
>> the `InputSplitAssigner` not being initialized for a particular vertex ID. 
>> Full error here[1].
>> 
>> I believe the underlying error is related to this[0] call to `instanceof 
>> InputFormatSourceFunction`.
>> 
>> My questions:
>> 
>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am 
>> I missing a chunk of the API covering this?
>> 2. is the error I'm experience related to that casting call? If so, would 
>> ya'll be open to a PR which adds an interface one can extend which will set 
>> the input format in the stream graph? Or is there a preferred way of 
>> achieving this?
>> 
>> Thanks!
>> 
>> Aaron Levin
>> 
>> [0] 
>> https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
>>  
>> <https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480>
>> [1] 
>> java.lang.RuntimeException: Could not retrieve next input split.
>> at 
>> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>> at 
>> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>> at REDACTED
>> at 
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>> at 
>> org.apache.flink.str

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-01 Thread Aaron Levin
cancel(): Unit = {
> isRunning = false
>   }
>
>   override def close(): Unit = {
> format.close()
> if(format.isInstanceOf[RichInputFormat[_,_]]) {
>   format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
> }
>   }
>
>   private def getInputSplits(): Iterator[InputSplit] = {
> new Iterator[InputSplit] {
>   private var nextSplit: InputSplit = _
>   private var exhausted: Boolean = _
>
>   override def hasNext: Boolean = {
> if(exhausted) { return false }
> if(nextSplit != null) { return true }
> var split: InputSplit = null
>
> try {
>   split = provider.getNextInputSplit(get
> RuntimeContext.getUserCodeClassLoader)
> } catch {
>   case e: InputSplitProviderException =>
> throw new RuntimeException("No InputSplit Provider", e)
> }
>
> if(split != null) {
>   nextSplit = split
>   true
> } else {
>   exhausted = true
>   false
> }
>   }
>
>   override def next(): InputSplit = {
> if(nextSplit == null && !hasNext) {
>   throw new NoSuchElementException()
> }
> val tmp: InputSplit = nextSplit
> nextSplit = null
> tmp
>   }
>
> }
>   }
> }
>
>
> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz 
> wrote:
>
>> Hi Aaron,
>>
>> Could you share the code of you custom function?
>>
>> I am also adding Aljosha and Kostas to cc, who should be more helpful on
>> that topic.
>>
>> Best,
>>
>> Dawid
>> On 19/10/2018 20:06, Aaron Levin wrote:
>>
>> Hi,
>>
>> I'm writing a custom `SourceFunction` which wraps an underlying
>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
>> stream (via `env.addSource` and a subsequent sink) I get errors related to
>> the `InputSplitAssigner` not being initialized for a particular vertex ID.
>> Full error here[1].
>>
>> I believe the underlying error is related to this[0] call to `instanceof
>> InputFormatSourceFunction`.
>>
>> *My questions*:
>>
>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
>> Am I missing a chunk of the API covering this?
>> 2. is the error I'm experience related to that casting call? If so, would
>> ya'll be open to a PR which adds an interface one can extend which will set
>> the input format in the stream graph? Or is there a preferred way of
>> achieving this?
>>
>> Thanks!
>>
>> Aaron Levin
>>
>> [0] https://github.com/apache/flink/blob/release-1.6/flink-s
>> treaming-java/src/main/java/org/apache/flink/streaming/api/
>> graph/StreamGraphGenerator.java#L480
>> [1]
>> java.lang.RuntimeException: Could not retrieve next input split.
>> at org.apache.flink.streaming.api.functions.source.InputFormatS
>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>> at org.apache.flink.streaming.api.functions.source.InputFormatS
>> ourceFunction.open(InputFormatSourceFunction.java:71)
>> at REDACTED
>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:102)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:424)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:290)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: 
>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>> Requesting the next input split failed.
>> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>> der.getNextInputSplit(RpcInputSplitProvider.java:69)
>> at org.apache.flink.streaming.api.functions.source.InputFormatS
>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>> ... 8 more
>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
>> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>> at java.util.concurrent.CompletableFuture.reportGet(Completable
>> Future.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture
>> .java:1915)
>> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>> der.getNextInputSplit(RpcInputSplitProvider.java:61)
>> ... 9 more
>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
>> cbc357ccb763df2852fee8c4fc7d55f2
>> at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu
>> tSplit(JobMaster.java:575)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>> cation(AkkaRpcActor.java:247)
>> ...
>>
>>
>


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-26 Thread Aaron Levin
  }

if(split != null) {
  nextSplit = split
  true
} else {
  exhausted = true
  false
}
  }

  override def next(): InputSplit = {
if(nextSplit == null && !hasNext) {
  throw new NoSuchElementException()
}
val tmp: InputSplit = nextSplit
nextSplit = null
tmp
  }

}
  }
}


On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz 
wrote:

> Hi Aaron,
>
> Could you share the code of you custom function?
>
> I am also adding Aljosha and Kostas to cc, who should be more helpful on
> that topic.
>
> Best,
>
> Dawid
> On 19/10/2018 20:06, Aaron Levin wrote:
>
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
> stream (via `env.addSource` and a subsequent sink) I get errors related to
> the `InputSplitAssigner` not being initialized for a particular vertex ID.
> Full error here[1].
>
> I believe the underlying error is related to this[0] call to `instanceof
> InputFormatSourceFunction`.
>
> *My questions*:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
> Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so, would
> ya'll be open to a PR which adds an interface one can extend which will set
> the input format in the stream graph? Or is there a preferred way of
> achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] https://github.com/apache/flink/blob/release-1.6/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/api/graph/
> StreamGraphGenerator.java#L480
> [1]
> java.lang.RuntimeException: Could not retrieve next input split.
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
> at REDACTED
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:69)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
> ... 8 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
> at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(
> CompletableFuture.java:1915)
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:61)
> ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
> at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(
> JobMaster.java:575)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> ...
>
>


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Aaron Levin
austed = true
  false
}
  }

  override def next(): InputSplit = {
if(nextSplit == null && !hasNext) {
  throw new NoSuchElementException()
}
val tmp: InputSplit = nextSplit
nextSplit = null
tmp
  }

}
  }
}

Best,

Aaron Levin

On Wed, Oct 24, 2018 at 8:00 AM, Kien Truong 
wrote:

> Hi,
>
> Since InputFormatSourceFunction is a subclass of
> RichParallelSourceFunction, your wrapper should also extend this class.
>
> In addition, remember to overwrite the methods defined in the
> AbstractRichFunction interface and
>
> proxy the call to the underlying InputFormatSourceFunction, in order to
> initialize the underlying source correctly.
>
>
> Best regards,
>
> Kien
>
>
> On 10/20/2018 1:06 AM, Aaron Levin wrote:
>
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
> stream (via `env.addSource` and a subsequent sink) I get errors related to
> the `InputSplitAssigner` not being initialized for a particular vertex ID.
> Full error here[1].
>
> I believe the underlying error is related to this[0] call to `instanceof
> InputFormatSourceFunction`.
>
> *My questions*:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
> Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so, would
> ya'll be open to a PR which adds an interface one can extend which will set
> the input format in the stream graph? Or is there a preferred way of
> achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] https://github.com/apache/flink/blob/release-1.6/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/api/graph/
> StreamGraphGenerator.java#L480
> [1]
> java.lang.RuntimeException: Could not retrieve next input split.
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
> at REDACTED
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:69)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
> ... 8 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
> at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(
> CompletableFuture.java:1915)
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:61)
> ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
> at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(
> JobMaster.java:575)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> ...
>
>


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Kien Truong

Hi,

Since InputFormatSourceFunction is a subclass of 
RichParallelSourceFunction, your wrapper should also extend this class.


In addition, remember to overwrite the methods defined in the 
AbstractRichFunction interface and


proxy the call to the underlying InputFormatSourceFunction, in order to 
initialize the underlying source correctly.



Best regards,

Kien


On 10/20/2018 1:06 AM, Aaron Levin wrote:

Hi,

I'm writing a custom `SourceFunction` which wraps an underlying 
`InputFormatSourceFunction`. When I try to use this `SourceFunction` 
in a stream (via `env.addSource` and a subsequent sink) I get errors 
related to the `InputSplitAssigner` not being initialized for a 
particular vertex ID. Full error here[1].


I believe the underlying error is related to this[0] call to 
`instanceof InputFormatSourceFunction`.


_My questions_:

1. how can I wrap a `InputFormatSourceFunction` which avoids this 
error? Am I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so, 
would ya'll be open to a PR which adds an interface one can extend 
which will set the input format in the stream graph? Or is there a 
preferred way of achieving this?


Thanks!

Aaron Levin

[0] 
https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480

[1]
java.lang.RuntimeException: Could not retrieve next input split.
    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)

    at REDACTED
    at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
Requesting the next input split failed.
    at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)

    ... 8 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.Exception: No InputSplitAssigner for vertex ID 
cbc357ccb763df2852fee8c4fc7d55f2
    at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)

    ... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID 
cbc357ccb763df2852fee8c4fc7d55f2
    at 
org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)

...


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Dawid Wysakowicz
Hi Aaron,

Could you share the code of you custom function?

I am also adding Aljosha and Kostas to cc, who should be more helpful on
that topic.

Best,

Dawid

On 19/10/2018 20:06, Aaron Levin wrote:
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction`
> in a stream (via `env.addSource` and a subsequent sink) I get errors
> related to the `InputSplitAssigner` not being initialized for a
> particular vertex ID. Full error here[1].
>
> I believe the underlying error is related to this[0] call to
> `instanceof InputFormatSourceFunction`.
>
> _My questions_:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this
> error? Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so,
> would ya'll be open to a PR which adds an interface one can extend
> which will set the input format in the stream graph? Or is there a
> preferred way of achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] 
> https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
> [1] 
> java.lang.RuntimeException: Could not retrieve next input split.
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>     at REDACTED
>     at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
>     at
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>     ... 8 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
>     at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>     at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>     at
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>     ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
>     at
> org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> ...


signature.asc
Description: OpenPGP digital signature


Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-19 Thread Aaron Levin
Hi,

I'm writing a custom `SourceFunction` which wraps an underlying
`InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
stream (via `env.addSource` and a subsequent sink) I get errors related to
the `InputSplitAssigner` not being initialized for a particular vertex ID.
Full error here[1].

I believe the underlying error is related to this[0] call to `instanceof
InputFormatSourceFunction`.

*My questions*:

1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am
I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so, would
ya'll be open to a PR which adds an interface one can extend which will set
the input format in the stream graph? Or is there a preferred way of
achieving this?

Thanks!

Aaron Levin

[0]
https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
[1]
java.lang.RuntimeException: Could not retrieve next input split.
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
at REDACTED
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
Requesting the next input split failed.
at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No
InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
cbc357ccb763df2852fee8c4fc7d55f2
at
org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
...


Re: Test harness for validating proper checkpointing of custom SourceFunction

2018-09-13 Thread Aljoscha Krettek
Hi Ken,

you can use the (slightly misnamed for this purpose) 
AbstractStreamOperatorTestHarness. It's used in the Flink codebase for unit 
testing sources.

Best,
Aljoscha

> On 12. Sep 2018, at 23:37, Ken Krugler  wrote:
> 
> Hi all,
> 
> We’re using the (Keyed)(One|Two)InputStreamOperatorTestHarness classes to 
> test checkpointing of some custom functions.
> 
> But in looking through the Flink source, I didn’t see anything comparable for 
> testing a custom SourceFunction (which implements the ListCheckpointed 
> interface).
> 
> What’s the recommended approach for this?
> 
> We can of course fire up the workflow with checkpointing, and add some 
> additional logic that kills the job after a checkpoint has happened, etc.
> 
> But it seems like there should be a better way.
> 
> Thanks,
> 
> — Ken
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 



Test harness for validating proper checkpointing of custom SourceFunction

2018-09-12 Thread Ken Krugler
Hi all,

We’re using the (Keyed)(One|Two)InputStreamOperatorTestHarness classes to test 
checkpointing of some custom functions.

But in looking through the Flink source, I didn’t see anything comparable for 
testing a custom SourceFunction (which implements the ListCheckpointed 
interface).

What’s the recommended approach for this?

We can of course fire up the workflow with checkpointing, and add some 
additional logic that kills the job after a checkpoint has happened, etc.

But it seems like there should be a better way.

Thanks,

— Ken

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: ClassNotFoundException in custom SourceFunction

2017-12-09 Thread romain.jln
Hi, 

The problem is that most of the exceptions appear when my job has been
running for some hours. 

The only way for me to reproduce some of those errors is by using the web UI
and hitting the cancel button of my job. So if I can find a way to generate
this action locally in a test, maybe I can use a debugger to see where the
code is invoking the ClassLoader.

But in the case of the stack trace I sent you, when going into the source
code of the Eventhub, it turns out that the class causing the exception is
actually an anonymous class implementing the standard interface Runnable.
The cluster is running on a linux distribution, so I thought at first that,
as linux is flushing from time to time the tmp file, it could be the reason.
But since then, I restarted the cluster with the configuration
taskmanager.tmp.dirs and jobmanager.web.tmpdir set to another directory and
it seems that it did not solve the issue.

Best,
Romain



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread Aljoscha Krettek
Hi,

Is it possible to go in there with a debugger and see where exactly the code is 
invoking the ClassLoader?

Best,
Aljoscha

> On 8. Dec 2017, at 14:13, romain.jln  wrote:
> 
> Hi,
> 
> FYI, I edited my message on the Nabble archive website because I realised I
> sent the wrong stack trace at first (but I don't know if you've noticed the
> modification). The first one was actually related to a custom Sink function
> that sends data to the Eventhub (not sure whether they are related issues or
> not)
> 
> The one related to the source is the following:
> 
> Exception in thread "Thread-2" java.lang.NoClassDefFoundError:
> com/microsoft/azure/eventhubs/MessageReceiver$10 
>at
> com.microsoft.azure.eventhubs.MessageReceiver.scheduleLinkCloseTimeout(MessageReceiver.java:574)
>  
>at
> com.microsoft.azure.eventhubs.MessageReceiver.onClose(MessageReceiver.java:641)
>  
>at
> com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78) 
>at
> com.microsoft.azure.eventhubs.PartitionReceiver.onClose(PartitionReceiver.java:391)
>  
>at
> com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78) 
>at
> com.microsoft.azure.eventhubs.ClientEntity.closeSync(ClientEntity.java:93) 
>at
> commons.source.azure.eventhub.PartitionPoller.shutdown(PartitionPoller.java:80)
>  
>at
> commons.source.azure.eventhub.PartitionPoller.run(PartitionPoller.java:55) 
> Caused by: java.lang.ClassNotFoundException:
> com.microsoft.azure.eventhubs.MessageReceiver$10 
>at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
>at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
>at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
>... 8 more 
> 
> 
> I don't know if I exactly understand what you mean by " trying to use the
> Thread Context ClassLoader" (I do not have a deep knowledge related to
> ClassLoader) but I am not calling explicitly any method on the Thread
> Context ClassLoader and by going through the different methods of the stack
> trace I did not notice any neither.
> 
> As I was explaining at first, within the source I basically create a thread
> that is constantly polling messages from the Eventhub (like the
> KafkaConsummerThread is doing with kafka) using Microsoft library. Under the
> hood, Microsoft uses some threadPoolExecutor and the proton library.
> 
> The whole source works fine except that I get those ClassNotFoundException
> from time to time with no real apparent reason.
> 
> I use Flink Version: 1.3.2
> 
> Best,
> Romain
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread romain.jln
Hi,

FYI, I edited my message on the Nabble archive website because I realised I
sent the wrong stack trace at first (but I don't know if you've noticed the
modification). The first one was actually related to a custom Sink function
that sends data to the Eventhub (not sure whether they are related issues or
not)

The one related to the source is the following:

Exception in thread "Thread-2" java.lang.NoClassDefFoundError:
com/microsoft/azure/eventhubs/MessageReceiver$10 
at
com.microsoft.azure.eventhubs.MessageReceiver.scheduleLinkCloseTimeout(MessageReceiver.java:574)
 
at
com.microsoft.azure.eventhubs.MessageReceiver.onClose(MessageReceiver.java:641) 
at
com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78) 
at
com.microsoft.azure.eventhubs.PartitionReceiver.onClose(PartitionReceiver.java:391)
 
at
com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78) 
at
com.microsoft.azure.eventhubs.ClientEntity.closeSync(ClientEntity.java:93) 
at
commons.source.azure.eventhub.PartitionPoller.shutdown(PartitionPoller.java:80) 
at
commons.source.azure.eventhub.PartitionPoller.run(PartitionPoller.java:55) 
Caused by: java.lang.ClassNotFoundException:
com.microsoft.azure.eventhubs.MessageReceiver$10 
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
... 8 more 


I don't know if I exactly understand what you mean by " trying to use the
Thread Context ClassLoader" (I do not have a deep knowledge related to
ClassLoader) but I am not calling explicitly any method on the Thread
Context ClassLoader and by going through the different methods of the stack
trace I did not notice any neither.

As I was explaining at first, within the source I basically create a thread
that is constantly polling messages from the Eventhub (like the
KafkaConsummerThread is doing with kafka) using Microsoft library. Under the
hood, Microsoft uses some threadPoolExecutor and the proton library.

The whole source works fine except that I get those ClassNotFoundException
from time to time with no real apparent reason.

I use Flink Version: 1.3.2

Best,
Romain



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread Aljoscha Krettek
Hi,

Is the code that is throwing the exception trying to use the Tread Context 
ClassLoader? If yes, that might explain it because a Thread that you create 
will not have the correct ClassLoader set.

Best,
Aljoscha

> On 8. Dec 2017, at 12:24, Fabian Hueske  wrote:
> 
> Hi,
> 
> thanks a lot for investigating this problems and the results you shared.
> This looks like a bug to me. I'm CCing Aljoscha who knows the internals of 
> the DataStream API very well.
> 
> Which Flink version are you using?
> 
> Would you mind creating a JIRA issue [1] with all the info you provided so 
> far?
> 
> Thank you,
> Fabian
> 
> [1] https://issues.apache.org/jira/projects/FLINK/summary 
> 
> 
> 2017-12-08 11:27 GMT+01:00 romain.jln  >:
> Hi,
> 
> The stack trace is usually something like :
> 
> Exception in thread "Thread-49" java.lang.NoClassDefFoundError:
> com/microsoft/azure/eventhubs/amqp/AmqpErrorCode
> at
> com.microsoft.azure.eventhubs.ExceptionUtil.toException(ExceptionUtil.java:30)
> at
> com.microsoft.azure.eventhubs.MessageSender.onClose(MessageSender.java:376)
> at
> com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.processOnClose(BaseLinkHandler.java:76)
> at
> com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:47)
> at 
> org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
> at
> org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
> at
> org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
> at
> org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
> at
> com.microsoft.azure.eventhubs.MessagingFactory$RunReactor.run(MessagingFactory.java:404)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> com.microsoft.azure.eventhubs.amqp.AmqpErrorCode
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 10 more
> 
> (those messages are appearing randomly in the stdout of the task managers)
> 
> For a little bit of context about to this stack trace, it is related to a
> custom implementation of a Flink Source that connects to an Azure Eventhub.
> When starting an Eventhub client, the Eventhub library creates a Reactor
> thread for managing the AMQP messages (proton library). This thread is
> created in the Open function of the custom source.
> 
> I checked the fat jar that I am uploading to Flink using the web API and the
> given class is correctly located at the given path.
> 
> It is not always the same class that is missing. It can also be
> com.microsoft.azure.eventhubs.ExceptionUtil,
> com.microsoft.azure.eventhubs.MessageReceiver$10, or other classes of the
> same package. All of those classes are correctly located in the fat jar.
> 
> I kept on investigating the issue and here are the first results I got :
> 
> Using Thread.currentThread().getContextClassLoader(), I can see that, when
> manually cancelling the job (via the web API), the class of the ClassLoader
> is sun.misc.Launcher$AppClassLoader instead of FlinkUserCodeClassLoader
> (which can explain some of the ClassNotFoundException)
> 
> However, when Flink automatically cancels the source (because of an error
> during the execution of the job), it correctly uses a
> FlinkUserCodeClassLoader as expected.
> 
> When checking the ClassLoader of the thread during the call to the Open
> method of the source, it also correctly uses a FlinkUserCodeClassLoader.
> 
> But I still keep on getting some ClassNotFoundException from time to time
> for no apparent reason to me.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 



Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread Fabian Hueske
Hi,

thanks a lot for investigating this problems and the results you shared.
This looks like a bug to me. I'm CCing Aljoscha who knows the internals of
the DataStream API very well.

Which Flink version are you using?

Would you mind creating a JIRA issue [1] with all the info you provided so
far?

Thank you,
Fabian

[1] https://issues.apache.org/jira/projects/FLINK/summary

2017-12-08 11:27 GMT+01:00 romain.jln :

> Hi,
>
> The stack trace is usually something like :
>
> Exception in thread "Thread-49" java.lang.NoClassDefFoundError:
> com/microsoft/azure/eventhubs/amqp/AmqpErrorCode
> at
> com.microsoft.azure.eventhubs.ExceptionUtil.toException(
> ExceptionUtil.java:30)
> at
> com.microsoft.azure.eventhubs.MessageSender.onClose(
> MessageSender.java:376)
> at
> com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.processOnClose(
> BaseLinkHandler.java:76)
> at
> com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.onLinkRemoteClose(
> BaseLinkHandler.java:47)
> at org.apache.qpid.proton.engine.BaseHandler.handle(
> BaseHandler.java:176)
> at
> org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
> at
> org.apache.qpid.proton.reactor.impl.ReactorImpl.
> dispatch(ReactorImpl.java:309)
> at
> org.apache.qpid.proton.reactor.impl.ReactorImpl.
> process(ReactorImpl.java:276)
> at
> com.microsoft.azure.eventhubs.MessagingFactory$RunReactor.
> run(MessagingFactory.java:404)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> com.microsoft.azure.eventhubs.amqp.AmqpErrorCode
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 10 more
>
> (those messages are appearing randomly in the stdout of the task managers)
>
> For a little bit of context about to this stack trace, it is related to a
> custom implementation of a Flink Source that connects to an Azure Eventhub.
> When starting an Eventhub client, the Eventhub library creates a Reactor
> thread for managing the AMQP messages (proton library). This thread is
> created in the Open function of the custom source.
>
> I checked the fat jar that I am uploading to Flink using the web API and
> the
> given class is correctly located at the given path.
>
> It is not always the same class that is missing. It can also be
> com.microsoft.azure.eventhubs.ExceptionUtil,
> com.microsoft.azure.eventhubs.MessageReceiver$10, or other classes of the
> same package. All of those classes are correctly located in the fat jar.
>
> I kept on investigating the issue and here are the first results I got :
>
> Using Thread.currentThread().getContextClassLoader(), I can see that, when
> manually cancelling the job (via the web API), the class of the ClassLoader
> is sun.misc.Launcher$AppClassLoader instead of FlinkUserCodeClassLoader
> (which can explain some of the ClassNotFoundException)
>
> However, when Flink automatically cancels the source (because of an error
> during the execution of the job), it correctly uses a
> FlinkUserCodeClassLoader as expected.
>
> When checking the ClassLoader of the thread during the call to the Open
> method of the source, it also correctly uses a FlinkUserCodeClassLoader.
>
> But I still keep on getting some ClassNotFoundException from time to time
> for no apparent reason to me.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread romain.jln
Hi,

The stack trace is usually something like :

Exception in thread "Thread-49" java.lang.NoClassDefFoundError:
com/microsoft/azure/eventhubs/amqp/AmqpErrorCode
at
com.microsoft.azure.eventhubs.ExceptionUtil.toException(ExceptionUtil.java:30)
at
com.microsoft.azure.eventhubs.MessageSender.onClose(MessageSender.java:376)
at
com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.processOnClose(BaseLinkHandler.java:76)
at
com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:47)
at 
org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
at
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
at
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
at
com.microsoft.azure.eventhubs.MessagingFactory$RunReactor.run(MessagingFactory.java:404)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
com.microsoft.azure.eventhubs.amqp.AmqpErrorCode
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 10 more

(those messages are appearing randomly in the stdout of the task managers)

For a little bit of context about to this stack trace, it is related to a
custom implementation of a Flink Source that connects to an Azure Eventhub.
When starting an Eventhub client, the Eventhub library creates a Reactor
thread for managing the AMQP messages (proton library). This thread is
created in the Open function of the custom source.

I checked the fat jar that I am uploading to Flink using the web API and the
given class is correctly located at the given path.

It is not always the same class that is missing. It can also be
com.microsoft.azure.eventhubs.ExceptionUtil,
com.microsoft.azure.eventhubs.MessageReceiver$10, or other classes of the
same package. All of those classes are correctly located in the fat jar.

I kept on investigating the issue and here are the first results I got : 

Using Thread.currentThread().getContextClassLoader(), I can see that, when
manually cancelling the job (via the web API), the class of the ClassLoader
is sun.misc.Launcher$AppClassLoader instead of FlinkUserCodeClassLoader
(which can explain some of the ClassNotFoundException)

However, when Flink automatically cancels the source (because of an error
during the execution of the job), it correctly uses a
FlinkUserCodeClassLoader as expected. 

When checking the ClassLoader of the thread during the call to the Open
method of the source, it also correctly uses a FlinkUserCodeClassLoader. 

But I still keep on getting some ClassNotFoundException from time to time
for no apparent reason to me.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ClassNotFoundException in custom SourceFunction

2017-12-07 Thread Fabian Hueske
Hi,

A ClassNotFoundException should not be expected behavior.
Can you post the stacktrace of the exception?

We had a few issues in the past where Flink didn't use the correct
classloader.
So this would not be an unusual bug.

Thanks,
Fabian

2017-12-07 10:44 GMT+01:00 Tugdual Grall :

> ok
>
> On Thu, Dec 7, 2017 at 10:35 AM, romain.jln  wrote:
>
>> Hi all,
>>
>> I am experiencing some problems with a custom source that I have
>> implemented. I am getting some ClassNotFoundException randomly during the
>> execution of the job meanwhile the fat jar submitted to Flink contains the
>> given classes.
>>
>> After several hours of investigation, I think I have been able to narrow
>> down the potential cause.
>>
>> Currently, it seems that the thread executing the cancel method of my
>> source
>> function is not using the normal FlinkUserCodeClassLoader but rather the
>> AppClassLoader which seems to cause those ClassNotFoundExceptions.
>>
>> I wanted to know if this behaviour was expected or if it was actually a
>> bug.
>>
>> Thanks !
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: ClassNotFoundException in custom SourceFunction

2017-12-07 Thread Tugdual Grall
ok

On Thu, Dec 7, 2017 at 10:35 AM, romain.jln  wrote:

> Hi all,
>
> I am experiencing some problems with a custom source that I have
> implemented. I am getting some ClassNotFoundException randomly during the
> execution of the job meanwhile the fat jar submitted to Flink contains the
> given classes.
>
> After several hours of investigation, I think I have been able to narrow
> down the potential cause.
>
> Currently, it seems that the thread executing the cancel method of my
> source
> function is not using the normal FlinkUserCodeClassLoader but rather the
> AppClassLoader which seems to cause those ClassNotFoundExceptions.
>
> I wanted to know if this behaviour was expected or if it was actually a
> bug.
>
> Thanks !
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


ClassNotFoundException in custom SourceFunction

2017-12-07 Thread romain.jln
Hi all,

I am experiencing some problems with a custom source that I have
implemented. I am getting some ClassNotFoundException randomly during the
execution of the job meanwhile the fat jar submitted to Flink contains the
given classes.

After several hours of investigation, I think I have been able to narrow
down the potential cause.

Currently, it seems that the thread executing the cancel method of my source
function is not using the normal FlinkUserCodeClassLoader but rather the
AppClassLoader which seems to cause those ClassNotFoundExceptions. 

I wanted to know if this behaviour was expected or if it was actually a bug.

Thanks !



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Freeing resources in SourceFunction

2016-11-03 Thread Maximilian Michels
For your use case you should use the close() method which is always
called upon shutdown of your source. The cancel() is only called when
you explicitly cancel your job.

-Max


On Thu, Nov 3, 2016 at 2:45 PM, Yury Ruchin  wrote:
> Hello,
>
> I'm writing a custom source function for my streaming job. The source
> function manages some connection pool. I want to close that pool once my job
> is "finished" (since the stream is unbounded, the only way I see is to
> cancel the streaming job). Since I inherit RichSourceFunction, there are two
> candidates: cancel() and close(). I'm wondering which one should be picked.
> Looking for best practices, I resorted to the existing sources. One example
> is FlinkKafkaConsumerBase which has both callbacks implemented identically
> (one delegating to the other). Counterexample is InputFormatSourceFunction
> which uses cancel() only to reset flag, while actual cleanup is done in
> close(). Which of these approaches is a better fit in the described case?
>
> Just FYI, Flink version I use is 1.1.2.
>
> Thanks,
> Yury


Freeing resources in SourceFunction

2016-11-03 Thread Yury Ruchin
Hello,

I'm writing a custom source function for my streaming job. The source
function manages some connection pool. I want to close that pool once my
job is "finished" (since the stream is unbounded, the only way I see is to
cancel the streaming job). Since I inherit RichSourceFunction, there are
two candidates: cancel() and close(). I'm wondering which one should be
picked. Looking for best practices, I resorted to the existing sources. One
example is FlinkKafkaConsumerBase which has both callbacks implemented
identically (one delegating to the other). Counterexample is
InputFormatSourceFunction which uses cancel() only to reset flag, while
actual cleanup is done in close(). Which of these approaches is a better
fit in the described case?

Just FYI, Flink version I use is 1.1.2.

Thanks,
Yury


Re: SourceFunction Scala

2016-03-12 Thread Stefano Baghino
Hi Ankur,

I'm catching up with this week mailing list right now; I hope you already
solved the issue, but if you haven't this kind of problem happen when you
use a version of Scala for which your Flink dependencies have not been
compiled for. Make sure you append the correct Scala version to the
dependencies you're using, depending on the one you are using for your
project.

You can find more details here:
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

On Mon, Mar 7, 2016 at 1:19 PM, Ankur Sharma <an...@stud.uni-saarland.de>
wrote:

> Hi,
>
>
> I am getting following error while executing the fat jar of project: Any
> help?
>
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/util/serialization/DeserializationSchema
> at org.mpi.debs.Main.main(Main.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.util.serialization.DeserializationSchema
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 1 more
>
>
> Main.scala:
>
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>
>
> object Main {
>   def main(args: Array[String]) {
> val env = StreamExecutionEnvironment.createLocalEnvironment(1)
> val stream = env.addSource(new RMQSource[String]("localhost","query-one", 
> new SimpleStringSchema))
> stream.addSink(new SinkFunction[String] {
>   override def invoke(value: String) = {
> println(value)
>   }
> })
> env.execute("QueryOneExecutor")
>   }
> }
>
> Best,
> *Ankur Sharma*
>
> On 06 Mar 2016, at 20:34, Márton Balassi <balassi.mar...@gmail.com> wrote:
>
> Hey Ankur,
>
> Add the following line to your imports, and have a look at the referenced
> FAQ. [1]
>
> import org.apache.flink.streaming.api.scala._
>
> [1]
> https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters
>
> Best,
>
> Marton
>
> On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <an...@stud.uni-saarland.de>
> wrote:
>
>> Hello,
>>
>> I am trying to use a custom source function (declaration given below) for
>> DataStream.
>> if I add the source to stream using add source:
>>
>> val stream = env.addSource(new QueryOneSource(args))
>>
>> *I get following error:  Any explanations and help ??*
>>
>>
>> Error:(14, 31) could not find implicit value for evidence parameter of type 
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
>>
>> val stream = env.addSource(new QueryOneSource(args))
>>
>>   ^
>>
>> Error:(14, 31) not enough arguments for method addSource: (implicit 
>> evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit 
>> evidence$16: 
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
>>
>> Unspecified value parameter evidence$16.
>>
>> val stream = env.addSource(new QueryOneSource(args))
>>
>>   ^
>>
>>
>> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] 
>> {
>>
>> val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
>>
>> override def run(ctx: SourceContext[Tuple]) = {
>>   while (true) {
>> nextRecord()
>> ctx.collect(this.nextTuple)
>>   }
>> }
>>
>> override def cancel() = { }
>>
>> }
>>
>> override def nextRecord() = {
>>
>> }
>>
>> }
>>
>> Best,
>> *Ankur Sharma*
>> *Information Systems Group*
>> *3.15 E1.1 Universität des Saarlandes*
>> *66123, Saarbrücken Germany*
>> *Email: ankur.sha...@mpi-inf.mpg.de <ankur.sha...@mpi-inf.mpg.de> *
>> *an...@stud.uni-saarland.de <an...@stud.uni-saarland.de>*
>>
>>
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: SourceFunction Scala

2016-03-07 Thread Ankur Sharma
Hi, 


I am getting following error while executing the fat jar of project: Any help?


Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/util/serialization/DeserializationSchema
at org.mpi.debs.Main.main(Main.scala)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.util.serialization.DeserializationSchema
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 1 more


Main.scala: 

import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema


object Main {
  def main(args: Array[String]) {
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
val stream = env.addSource(new RMQSource[String]("localhost","query-one", 
new SimpleStringSchema))
stream.addSink(new SinkFunction[String] {
  override def invoke(value: String) = {
println(value)
  }
})
env.execute("QueryOneExecutor")
  }
}
Best,
Ankur Sharma

> On 06 Mar 2016, at 20:34, Márton Balassi <balassi.mar...@gmail.com> wrote:
> 
> Hey Ankur,
> 
> Add the following line to your imports, and have a look at the referenced 
> FAQ. [1]
> 
> import org.apache.flink.streaming.api.scala._
> 
> [1] 
> https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters
>  
> <https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters>
> 
> Best,
> 
> Marton
> 
> On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <an...@stud.uni-saarland.de 
> <mailto:an...@stud.uni-saarland.de>> wrote:
> Hello,
> 
> I am trying to use a custom source function (declaration given below) for 
> DataStream.
> if I add the source to stream using add source: 
> 
> val stream = env.addSource(new QueryOneSource(args))
> I get following error:  Any explanations and help ??
> 
> Error:(14, 31) could not find implicit value for evidence parameter of type 
> org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
> val stream = env.addSource(new QueryOneSource(args))
>   ^
> Error:(14, 31) not enough arguments for method addSource: (implicit 
> evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit 
> evidence$16: 
> org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
> Unspecified value parameter evidence$16.
> val stream = env.addSource(new QueryOneSource(args))
>   ^
> 
> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
> val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
> override def run(ctx: SourceContext[Tuple]) = {
>   while (true) {
> nextRecord()
> ctx.collect(this.nextTuple)
>   }
> }
> 
> override def cancel() = { }
> }
> 
> override def nextRecord() = {
> }
> }
> 
> Best,
> Ankur Sharma
> Information Systems Group
> 3.15 E1.1 Universität des Saarlandes
> 66123, Saarbrücken Germany
> Email: ankur.sha...@mpi-inf.mpg.de <mailto:ankur.sha...@mpi-inf.mpg.de> 
> an...@stud.uni-saarland.de <mailto:an...@stud.uni-saarland.de>
> 



smime.p7s
Description: S/MIME cryptographic signature