Custom source function in PyFlink

2023-08-28 Thread Őrhidi Mátyás
Hey folks,

I'm looking for an example for creating a custom source in PyFlink. The one
that I found in the tests is a wrapper around a java class:

def test_add_custom_source(self):
custom_source = SourceFunction(
"org.apache.flink.python.util.MyCustomSourceFunction")
ds = self.env.add_source(custom_source, type_info=Types.ROW([Types.INT(),
Types.STRING()]))
ds.add_sink(self.test_sink)
self.env.execute("test add custom source")
results = self.test_sink.get_results(False)
expected = [
'+I[3, Mike]',
'+I[1, Marry]',
'+I[4, Ted]',
'+I[5, Jack]',
'+I[0, Bob]',
'+I[2, Henry]']
results.sort()
expected.sort()
self.assertEqual(expected, results)

Is this the only way as of today?

Regards,
Matyas


Re: Flink TCP socket custom source - savepoint cannot be taken

2023-07-03 Thread Martijn Visser
Hi Kamal,

There's no such limitation, so most likely this is related to the
implementation of your TCP source connector. Do keep in mind that just by
the nature of TCP, I doubt that you will have any guarantees when it comes
to this source. E.g. if you roll back to a savepoint of one day ago, how
will you be able to retrieve the data from your TCP source that's exactly
the same as it would be a day ago.

Best regards,

Martijn

On Sat, Jul 1, 2023 at 7:33 AM Kamal Mittal via user 
wrote:

> Hello Community,
>
>
>
> I have a requirement to read data coming over TCP socket stream and for
> the same written one custom source function reading data by TCP socket.
>
>
>
> Job is running successfully but while trying to take a savepoint, error
> comes that savepoint cannot be taken.
>
>
>
> Is there any limitation that TCP streams state can’t be taken (by
> checkpoint or savepoint) and re-stored later?
>
>
>
> Rgds,
>
> Kamal
>


Re: Flink TCP custom source - secured server socket

2023-07-02 Thread Martin
This is a conclusion I also came across implementing an application receiving 
events via TCP/HTTP.

This also brings the benefit, as already said, that you can use the modern 
ecosystem for implementing such things and don't need to so it yourself.

I summarized my conclusions developing my application in that paper:
http://dx.doi.org/10.1145/3579142.3594292

Best regards,
Martin


Am 2. Juli 2023 09:13:14 MESZ schrieb "Jan Lukavský" :
>Hi,
>
>in plain TCP socket, there is no 'state' you can return to when restoring from 
>checkpoint. All you can do is reopening the socket. You would have to ensure 
>fault-tolerance by some (custom) higher-level protocol you implement, 
>including persistent storage, replication, etc. You will soon see you are 
>actually implementing a commit-log like Apache Kafka, so it might be better to 
>use Kafka directly. If your clients need to send data over plain TCP socket, 
>you can receive it over TCP (with a custom TCP server application), store to 
>Kafka and use Kafka as a data source for your Flink application.
>
>Best,
>
> Jan
>
>On 7/1/23 05:20, Kamal Mittal wrote:
>> 
>> Hello,
>> 
>> Thanks for your suggestion but please confirm below.
>> 
>> Is it the case that TCP socket source job can’t be restored from last 
>> checkpoint?
>> 
>> Rgds,
>> 
>> Kamal
>> 
>> *From:*Jan Lukavský 
>> *Sent:* 29 June 2023 06:18 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: Flink TCP custom source - secured server socket
>> 
>> > ... a state backward in (processing) time ...
>> (of course not processing, I meant to say event time)
>> 
>> On 6/29/23 14:45, Jan Lukavský wrote:
>> 
>> Hi Kamal,
>> 
>> you probably have several options:
>> 
>>  a) bundle your private key and certificate into your Flink
>> application's jar (not recommended, your service's private key
>> will have to be not exactly "private")
>>  b) create a service which will provide certificate for your
>> service during runtime (e.g. ACME based or similar)
>> 
>> I have a different note, though. Flink (or for that matters any
>> streaming engine, I'm more focused on Apache Beam) heavily relies
>> on the ability of sources to restore a state backward in
>> (processing) time. That is definitely not the case of a plain TCP
>> socket. It is likely you will experience data-loss issues with
>> this solution (regardless of security). This might be okay for
>> you, I just felt it would be good to stress this out.
>> 
>> Best,
>> 
>>  Jan
>> 
>> On 6/29/23 12:53, Kamal Mittal via user wrote:
>> 
>> Hello Community,
>> 
>> I have created TCP stream custom source and reading data from
>> TCP stream source.
>> 
>> But this TCP connection needs to be secured i.e. SSL based,
>> query is how to configure/provide certificates via Flink for
>> Client-Server secured TCP connection?
>> 
>> Rgds,
>> 
>> Kamal
>>

Re: Flink TCP custom source - secured server socket

2023-07-02 Thread Jan Lukavský

Hi,

in plain TCP socket, there is no 'state' you can return to when 
restoring from checkpoint. All you can do is reopening the socket. You 
would have to ensure fault-tolerance by some (custom) higher-level 
protocol you implement, including persistent storage, replication, etc. 
You will soon see you are actually implementing a commit-log like Apache 
Kafka, so it might be better to use Kafka directly. If your clients need 
to send data over plain TCP socket, you can receive it over TCP (with a 
custom TCP server application), store to Kafka and use Kafka as a data 
source for your Flink application.


Best,

 Jan

On 7/1/23 05:20, Kamal Mittal wrote:


Hello,

Thanks for your suggestion but please confirm below.

Is it the case that TCP socket source job can’t be restored from last 
checkpoint?


Rgds,

Kamal

*From:*Jan Lukavský 
*Sent:* 29 June 2023 06:18 PM
*To:* user@flink.apache.org
*Subject:* Re: Flink TCP custom source - secured server socket

> ... a state backward in (processing) time ...
(of course not processing, I meant to say event time)

On 6/29/23 14:45, Jan Lukavský wrote:

Hi Kamal,

you probably have several options:

 a) bundle your private key and certificate into your Flink
application's jar (not recommended, your service's private key
will have to be not exactly "private")
 b) create a service which will provide certificate for your
service during runtime (e.g. ACME based or similar)

I have a different note, though. Flink (or for that matters any
streaming engine, I'm more focused on Apache Beam) heavily relies
on the ability of sources to restore a state backward in
(processing) time. That is definitely not the case of a plain TCP
socket. It is likely you will experience data-loss issues with
this solution (regardless of security). This might be okay for
you, I just felt it would be good to stress this out.

Best,

 Jan

On 6/29/23 12:53, Kamal Mittal via user wrote:

Hello Community,

I have created TCP stream custom source and reading data from
TCP stream source.

But this TCP connection needs to be secured i.e. SSL based,
query is how to configure/provide certificates via Flink for
Client-Server secured TCP connection?

Rgds,

Kamal


Flink TCP socket custom source - savepoint cannot be taken

2023-06-30 Thread Kamal Mittal via user
Hello Community,

I have a requirement to read data coming over TCP socket stream and for the 
same written one custom source function reading data by TCP socket.

Job is running successfully but while trying to take a savepoint, error comes 
that savepoint cannot be taken.

Is there any limitation that TCP streams state can't be taken (by checkpoint or 
savepoint) and re-stored later?

Rgds,
Kamal


RE: Flink TCP custom source - secured server socket

2023-06-30 Thread Kamal Mittal via user
Hello,

Thanks for your suggestion but please confirm below.

Is it the case that TCP socket source job can’t be restored from last 
checkpoint?

Rgds,
Kamal

From: Jan Lukavský 
Sent: 29 June 2023 06:18 PM
To: user@flink.apache.org
Subject: Re: Flink TCP custom source - secured server socket


> ... a state backward in (processing) time ...
(of course not processing, I meant to say event time)
On 6/29/23 14:45, Jan Lukavský wrote:

Hi Kamal,

you probably have several options:

 a) bundle your private key and certificate into your Flink application's jar 
(not recommended, your service's private key will have to be not exactly 
"private")
 b) create a service which will provide certificate for your service during 
runtime (e.g. ACME based or similar)

I have a different note, though. Flink (or for that matters any streaming 
engine, I'm more focused on Apache Beam) heavily relies on the ability of 
sources to restore a state backward in (processing) time. That is definitely 
not the case of a plain TCP socket. It is likely you will experience data-loss 
issues with this solution (regardless of security). This might be okay for you, 
I just felt it would be good to stress this out.

Best,

 Jan
On 6/29/23 12:53, Kamal Mittal via user wrote:
Hello Community,

I have created TCP stream custom source and reading data from TCP stream source.

But this TCP connection needs to be secured i.e. SSL based, query is how to 
configure/provide certificates via Flink for Client-Server secured TCP 
connection?

Rgds,
Kamal


Re: Flink TCP custom source - secured server socket

2023-06-29 Thread Jan Lukavský

> ... a state backward in (processing) time ...
(of course not processing, I meant to say event time)

On 6/29/23 14:45, Jan Lukavský wrote:


Hi Kamal,

you probably have several options:

 a) bundle your private key and certificate into your Flink 
application's jar (not recommended, your service's private key will 
have to be not exactly "private")
 b) create a service which will provide certificate for your service 
during runtime (e.g. ACME based or similar)


I have a different note, though. Flink (or for that matters any 
streaming engine, I'm more focused on Apache Beam) heavily relies on 
the ability of sources to restore a state backward in (processing) 
time. That is definitely not the case of a plain TCP socket. It is 
likely you will experience data-loss issues with this solution 
(regardless of security). This might be okay for you, I just felt it 
would be good to stress this out.


Best,

 Jan

On 6/29/23 12:53, Kamal Mittal via user wrote:


Hello Community,

I have created TCP stream custom source and reading data from TCP 
stream source.


But this TCP connection needs to be secured i.e. SSL based, query is 
how to configure/provide certificates via Flink for Client-Server 
secured TCP connection?


Rgds,

Kamal


Re: Flink TCP custom source - secured server socket

2023-06-29 Thread Jan Lukavský

Hi Kamal,

you probably have several options:

 a) bundle your private key and certificate into your Flink 
application's jar (not recommended, your service's private key will have 
to be not exactly "private")
 b) create a service which will provide certificate for your service 
during runtime (e.g. ACME based or similar)


I have a different note, though. Flink (or for that matters any 
streaming engine, I'm more focused on Apache Beam) heavily relies on the 
ability of sources to restore a state backward in (processing) time. 
That is definitely not the case of a plain TCP socket. It is likely you 
will experience data-loss issues with this solution (regardless of 
security). This might be okay for you, I just felt it would be good to 
stress this out.


Best,

 Jan

On 6/29/23 12:53, Kamal Mittal via user wrote:


Hello Community,

I have created TCP stream custom source and reading data from TCP 
stream source.


But this TCP connection needs to be secured i.e. SSL based, query is 
how to configure/provide certificates via Flink for Client-Server 
secured TCP connection?


Rgds,

Kamal


Flink TCP custom source - secured server socket

2023-06-29 Thread Kamal Mittal via user
Hello Community,

I have created TCP stream custom source and reading data from TCP stream source.

But this TCP connection needs to be secured i.e. SSL based, query is how to 
configure/provide certificates via Flink for Client-Server secured TCP 
connection?

Rgds,
Kamal


Re: Custom source

2023-03-28 Thread Shammon FY
Thanks Lasse, I think you can create an issue and update the document if
there is any wrong

Best,
Shammon FY

On Wed, Mar 29, 2023 at 3:48 AM Lasse Nedergaard <
lassenedergaardfl...@gmail.com> wrote:

> Hi.
> I have figured it out. The documentation are wrong in both places.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 28. mar. 2023 kl. 14.21 skrev Lasse Nedergaard <
> lassenedergaardfl...@gmail.com>:
>
> Hi
>
> I have added the code to my test project and using flink 1.16.1
>
> I have two compile problems I don’t understand.
>
> 1.  In ChangelogCsvFormat the cast to producedTypeInfo come with this
> exception. Inconvertible types; cannot cast …. Object to typeinfo
>
> 2 In socketSourceFunction open method I get “multiple non-overlapping
> abstract methods found in interface org.Apache.Flink……
> DeserializationSchema.InitializationContext
>
>
> Any idea what I’m doing wrong?
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 28. mar. 2023 kl. 06.07 skrev Lasse Nedergaard <
> lassenedergaardfl...@gmail.com>:
>
> Thanks Shammon
>
> A perfect example to look at. Do you also know a example of a data stream
> source?
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 28. mar. 2023 kl. 02.43 skrev Shammon FY :
>
> 
> Hi Lasse
>
> Does your job table/sql or datastream? Here's the doc [1] for customized
> source in table and there is an example of socket source.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
>
> Best,
> Shammon FY
>
> On Mon, Mar 27, 2023 at 8:02 PM Lasse Nedergaard <
> lassenedergaardfl...@gmail.com> wrote:
>
>> Hi.
>>
>> I have to use data from a Rest API, very slow changing. Instead of doing
>> an async i/o request I would like to create a source function that read
>> from Rest Api so I can connect and enrich another stream without a lookup
>> for each incoming row.
>> Flink doc provide an explanation of how to do it but I find it pretty
>> hard get all the concepts to work together and I haven’t been able to find
>> a “simple” app example.
>> Is there anyone out there knowing a good example or give a simple
>> explanation of best practice for implementing your own source
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>


Re: Custom source

2023-03-27 Thread Shammon FY
Hi Lasse

Does your job table/sql or datastream? Here's the doc [1] for customized
source in table and there is an example of socket source.


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/

Best,
Shammon FY

On Mon, Mar 27, 2023 at 8:02 PM Lasse Nedergaard <
lassenedergaardfl...@gmail.com> wrote:

> Hi.
>
> I have to use data from a Rest API, very slow changing. Instead of doing
> an async i/o request I would like to create a source function that read
> from Rest Api so I can connect and enrich another stream without a lookup
> for each incoming row.
> Flink doc provide an explanation of how to do it but I find it pretty hard
> get all the concepts to work together and I haven’t been able to find a
> “simple” app example.
> Is there anyone out there knowing a good example or give a simple
> explanation of best practice for implementing your own source
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>


Custom source

2023-03-27 Thread Lasse Nedergaard
Hi. 

I have to use data from a Rest API, very slow changing. Instead of doing an 
async i/o request I would like to create a source function that read from Rest 
Api so I can connect and enrich another stream without a lookup for each 
incoming row. 
Flink doc provide an explanation of how to do it but I find it pretty hard get 
all the concepts to work together and I haven’t been able to find a “simple” 
app example. 
Is there anyone out there knowing a good example or give a simple explanation 
of best practice for implementing your own source 

Med venlig hilsen / Best regards
Lasse Nedergaard



Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-05 Thread Krzysztof Chmielewski
Thanks Fabian,
I was looking forward to use the unified Source interface in my use case.
The implementation was very intuitive with this new design.

I will try with TableFunction then.

Best.
Krzysztof Chmielewski

pt., 5 lis 2021 o 14:20 Fabian Paul  napisał(a):

> Hi Krzysztof,
>
> The blog post is not building a lookup source but only a scan source. For
> scan sources you can choose between the old RichSourceFunction
> or the new unified Source interface.
>
> For lookup sources you need to implement either a TableFunction or a
> AsyncTableFunction there are currently afaik no other options.
>
> Best,
> Fabian


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-05 Thread Fabian Paul
Hi Krzysztof,

The blog post is not building a lookup source but only a scan source. For scan 
sources you can choose between the old RichSourceFunction 
or the new unified Source interface. 

For lookup sources you need to implement either a TableFunction or a 
AsyncTableFunction there are currently afaik no other options.

Best,
Fabian

Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-05 Thread Krzysztof Chmielewski
Ok,
I think there is some misunderstanding here.
As it is presented in [1] for implementing Custom Source Connector for
Table API and SQL:



*"You first need to have a source connector which can be used in Flink’s
runtime system (...)*

*For complex connectors, you may want to implement the Source interface
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/connector/source/Source.html>
which
gives you a lot of control. For simpler use cases, you can use
the SourceFunction interface
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html>.
There are already a few different implementations of SourceFunction
interfaces for common use cases such as the FromElementsFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html>
class
and the RichSourceFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html>
class.*
 "

The examples in [1] implement a simple Source based on a RichSourceFunction
that is later used by TableSource that implements ScanTableSource
Interface. Although the LookupTableSource interface is also mentioned there
as a valid choice.
The Custom TableSource uses source connector that was implement earlier.

Basically I would like to use the same approach from [1] and use it with my
custom source connector (implemented based on unified Source interface) but
with LookupTableSource and not ScanTableSource.

In [1] there is no mention about using TableFunction, although the entire
post is based on RichSourceFunction connector.

[1] https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html


Cheers,
Krzysztof

pt., 5 lis 2021 o 11:39 Fabian Paul  napisał(a):

> I think neither Source nor RichSourceFunction are correct in this case.
> You can have a look at the Jdbc lookup source[1][2]. Your function needs to
> implement TableFunction.
>
> Best,
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-05 Thread Fabian Paul
I think neither Source nor RichSourceFunction are correct in this case. You can 
have a look at the Jdbc lookup source[1][2]. Your function needs to 
implement TableFunction.

Best,
Fabian

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
 

[2] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java

Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-05 Thread Krzysztof Chmielewski
Thank you Fabian,
what if I would rewrite my custom Source to use old RichSourchFunction
instead unified Source Interface?
Would it work then as Lookup?

Best,
Krzysztof

pt., 5 lis 2021 o 11:18 Fabian Paul  napisał(a):

> Hi Krzysztof,
>
> The unified Source is meant to be used for the DataStream API and Table
> API. Currently, we do not have definition of look up sources in the
> DataStream API therefore the new source do not work as lookups and only as
> scan sources.
>
> Maybe in the future we also want to define look ups in the DataStream API
> but there are no concrete plans yet.
>
> Best,
> Fabian


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-05 Thread Fabian Paul
Hi Krzysztof,

The unified Source is meant to be used for the DataStream API and Table API. 
Currently, we do not have definition of look up sources in the 
DataStream API therefore the new source do not work as lookups and only as scan 
sources.

Maybe in the future we also want to define look ups in the DataStream API but 
there are no concrete plans yet.

Best,
Fabian

Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-05 Thread Krzysztof Chmielewski
BTW,
@Ingo Burk
You wrote that "*the new, unified Source interface can only work as a scan
source.*"

Is there any special design reason behind it or its just simply not yet
implemented?

Thanks,
Krzysztof Chmielewski



czw., 4 lis 2021 o 16:27 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> Sure,
> I have a connector that will uses HTTP rest call to 3rd party system to
> get some data based on URL and parameters.
>
> Idea is to make it available to Flink SQL in order to use it like
> SELECT * FROM T where t.id = 123
>
> I would like to have two streams, one would be from T, and the second one
> would be from some other place and I would like to join them.
>
>
>
> czw., 4 lis 2021 o 16:18 Ingo Bürk  napisał(a):
>
>> Hi Krzysztof,
>>
>> the new, unified Source interface can only work as a scan source. Could
>> you maybe elaborate a bit on the connector implementation you have and how
>> you intend to have it work as a lookup source?
>>
>>
>> Best
>> Ingo
>>
>> On Thu, Nov 4, 2021 at 4:11 PM Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> wrote:
>>
>>> Thanks Fabian and Ingo,
>>> yes I forgot to add the refrence links, so here they are:
>>>
>>> [1]
>>> https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html
>>> [2] https://flink.apache.org/2021/09/07/connector-table-sql-api-part2
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/sources/
>>>
>>> In my case I would really need a LookupTableSource and not
>>> ScanTableSource since by use-case and source will get data for given
>>> parameters and I don't need to scan the entire resource.
>>>
>>> Cheers,
>>>
>>> czw., 4 lis 2021 o 15:48 Krzysztof Chmielewski <
>>> krzysiek.chmielew...@gmail.com> napisał(a):
>>>
>>>> Hi,
>>>> I was wondering if it is possible to implement a Source Table connector
>>>> like it is described in [1][2] with custom source that implements a new
>>>> Source interface [3] and not a SourceFunction.
>>>>
>>>> I already have my custom source but when I'm trying to implement a
>>>> Table Source from LookupTableSource or ScanTableSource like it is presented
>>>> in [1][2] It seems I need to have a SourceFunction object to To be able to
>>>> use ScanRuntimeProvider or LookupRuntimeProvider.
>>>>
>>>> In other words how can I use Source interface implementation in
>>>> TableSource?
>>>>
>>>> Regards,
>>>> Krzysztof Chmielewski
>>>>
>>>


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-04 Thread Krzysztof Chmielewski
Sure,
I have a connector that will uses HTTP rest call to 3rd party system to get
some data based on URL and parameters.

Idea is to make it available to Flink SQL in order to use it like
SELECT * FROM T where t.id = 123

I would like to have two streams, one would be from T, and the second one
would be from some other place and I would like to join them.



czw., 4 lis 2021 o 16:18 Ingo Bürk  napisał(a):

> Hi Krzysztof,
>
> the new, unified Source interface can only work as a scan source. Could
> you maybe elaborate a bit on the connector implementation you have and how
> you intend to have it work as a lookup source?
>
>
> Best
> Ingo
>
> On Thu, Nov 4, 2021 at 4:11 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Thanks Fabian and Ingo,
>> yes I forgot to add the refrence links, so here they are:
>>
>> [1]
>> https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html
>> [2] https://flink.apache.org/2021/09/07/connector-table-sql-api-part2
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/sources/
>>
>> In my case I would really need a LookupTableSource and not
>> ScanTableSource since by use-case and source will get data for given
>> parameters and I don't need to scan the entire resource.
>>
>> Cheers,
>>
>> czw., 4 lis 2021 o 15:48 Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> napisał(a):
>>
>>> Hi,
>>> I was wondering if it is possible to implement a Source Table connector
>>> like it is described in [1][2] with custom source that implements a new
>>> Source interface [3] and not a SourceFunction.
>>>
>>> I already have my custom source but when I'm trying to implement a Table
>>> Source from LookupTableSource or ScanTableSource like it is presented in
>>> [1][2] It seems I need to have a SourceFunction object to To be able to use
>>> ScanRuntimeProvider or LookupRuntimeProvider.
>>>
>>> In other words how can I use Source interface implementation in
>>> TableSource?
>>>
>>> Regards,
>>> Krzysztof Chmielewski
>>>
>>


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-04 Thread Ingo Bürk
Hi Krzysztof,

the new, unified Source interface can only work as a scan source. Could you
maybe elaborate a bit on the connector implementation you have and how you
intend to have it work as a lookup source?


Best
Ingo

On Thu, Nov 4, 2021 at 4:11 PM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Thanks Fabian and Ingo,
> yes I forgot to add the refrence links, so here they are:
>
> [1] https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html
> [2] https://flink.apache.org/2021/09/07/connector-table-sql-api-part2
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/sources/
>
> In my case I would really need a LookupTableSource and not ScanTableSource
> since by use-case and source will get data for given parameters and I don't
> need to scan the entire resource.
>
> Cheers,
>
> czw., 4 lis 2021 o 15:48 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
>
>> Hi,
>> I was wondering if it is possible to implement a Source Table connector
>> like it is described in [1][2] with custom source that implements a new
>> Source interface [3] and not a SourceFunction.
>>
>> I already have my custom source but when I'm trying to implement a Table
>> Source from LookupTableSource or ScanTableSource like it is presented in
>> [1][2] It seems I need to have a SourceFunction object to To be able to use
>> ScanRuntimeProvider or LookupRuntimeProvider.
>>
>> In other words how can I use Source interface implementation in
>> TableSource?
>>
>> Regards,
>> Krzysztof Chmielewski
>>
>


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-04 Thread Krzysztof Chmielewski
Thanks Fabian and Ingo,
yes I forgot to add the refrence links, so here they are:

[1] https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html
[2] https://flink.apache.org/2021/09/07/connector-table-sql-api-part2
[3]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/sources/

In my case I would really need a LookupTableSource and not ScanTableSource
since by use-case and source will get data for given parameters and I don't
need to scan the entire resource.

Cheers,

czw., 4 lis 2021 o 15:48 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> Hi,
> I was wondering if it is possible to implement a Source Table connector
> like it is described in [1][2] with custom source that implements a new
> Source interface [3] and not a SourceFunction.
>
> I already have my custom source but when I'm trying to implement a Table
> Source from LookupTableSource or ScanTableSource like it is presented in
> [1][2] It seems I need to have a SourceFunction object to To be able to use
> ScanRuntimeProvider or LookupRuntimeProvider.
>
> In other words how can I use Source interface implementation in
> TableSource?
>
> Regards,
> Krzysztof Chmielewski
>


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-04 Thread Ingo Bürk
Hi Krzystof,

instead of SourceFunctionProvider you need to use SourceProvider. If you
look at the filesystem connector you can see an example for that, too
(FileSystemTableSource#createSourceProvider).


Best
Ingo

On Thu, Nov 4, 2021 at 3:48 PM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi,
> I was wondering if it is possible to implement a Source Table connector
> like it is described in [1][2] with custom source that implements a new
> Source interface [3] and not a SourceFunction.
>
> I already have my custom source but when I'm trying to implement a Table
> Source from LookupTableSource or ScanTableSource like it is presented in
> [1][2] It seems I need to have a SourceFunction object to To be able to use
> ScanRuntimeProvider or LookupRuntimeProvider.
>
> In other words how can I use Source interface implementation in
> TableSource?
>
> Regards,
> Krzysztof Chmielewski
>


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-04 Thread Fabian Paul
Hi Krzysztof,

It is great to hear that you have implemented your source with the unified 
Source interface. To integrate you source in the Table API you can use the  
SourceProvider. You can take a look at how our FileSource does is[1]

Btw I think you forgot to add your references ;) 

Best,
Fabian

[1] 
https://github.com/apache/flink/blob/1b4490654e0438332c2a4cac679c05a321d3f34c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L151



Implementing a Custom Source Connector for Table API and SQL

2021-11-04 Thread Krzysztof Chmielewski
Hi,
I was wondering if it is possible to implement a Source Table connector
like it is described in [1][2] with custom source that implements a new
Source interface [3] and not a SourceFunction.

I already have my custom source but when I'm trying to implement a Table
Source from LookupTableSource or ScanTableSource like it is presented in
[1][2] It seems I need to have a SourceFunction object to To be able to use
ScanRuntimeProvider or LookupRuntimeProvider.

In other words how can I use Source interface implementation in TableSource?

Regards,
Krzysztof Chmielewski


Re: Approach to test custom Source/Sink

2021-08-09 Thread JING ZHANG
Hi Bin,
We could try the following method to cover the source/sink test.
Unit test: To verify whether the behavior of each method in custom source
or sink is expected. You could mock interactions with external storage
(database, IO, etc.) in this part.
Integration test: To test whether the source/sink could work well in a real
Flink job. You need to verify whether the custom connector behavior could
well work in a big picture, such as checkpoint, read/ write to external
storage.
You could find many UT/IT in the source code of Flink built-in
connectors[1] .

[1] https://github.com/apache/flink/tree/master/flink-connectors

Best,
JING ZHANG

Xinbin Huang  于2021年8月10日周二 上午4:22写道:

> Hi team,
>
> I'm currently implementing a custom source and sink, and I'm trying to
> find a way to test these implementations. The testing section
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators>
> in the official doc seems to only include testing for stateful/stateless
> operators. Do you have any recommendations on how I should approach this?
>
> Thanks
> Bin
>


Re: Approach to test custom Source/Sink

2021-08-09 Thread Caizhi Weng
Hi!

Currently there is no general principle for testing sources and sinks.
However you might want to check out the unit tests and IT cases for Flink
connectors. For example flink-connector-jdbc module has a lot of tests for
the JDBC source and sink. Follow the ideas in these tests should be enough.

Xinbin Huang  于2021年8月10日周二 上午4:22写道:

> Hi team,
>
> I'm currently implementing a custom source and sink, and I'm trying to
> find a way to test these implementations. The testing section
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators>
> in the official doc seems to only include testing for stateful/stateless
> operators. Do you have any recommendations on how I should approach this?
>
> Thanks
> Bin
>


Approach to test custom Source/Sink

2021-08-09 Thread Xinbin Huang
Hi team,

I'm currently implementing a custom source and sink, and I'm trying to find
a way to test these implementations. The testing section
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators>
in the official doc seems to only include testing for stateful/stateless
operators. Do you have any recommendations on how I should approach this?

Thanks
Bin


Re: Custom Source with the new Data Source API

2021-08-04 Thread Yuval Itzchakov
Hi Bin,

Flinks Kafka source has been rewritten using the new Source API.
You can find it here:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java

On Wed, Aug 4, 2021 at 8:51 PM Xinbin Huang  wrote:

> Hi team,
>
> I'm trying to develop a custom source using the new Data Source API but I
> have some hard time finding examples for it. Can you point me to some
> existing Sources implemented with the new Data Source API? It would be
> ideal if source is for a pull-based unbound source (i.e. Kafka).
>
> Thanks!
> Bin
>


-- 
Best Regards,
Yuval Itzchakov.


Custom Source with the new Data Source API

2021-08-04 Thread Xinbin Huang
Hi team,

I'm trying to develop a custom source using the new Data Source API but I
have some hard time finding examples for it. Can you point me to some
existing Sources implemented with the new Data Source API? It would be
ideal if source is for a pull-based unbound source (i.e. Kafka).

Thanks!
Bin


Re: Custom source with multiple, differently typed outputs

2021-02-11 Thread Arvid Heise
Hi Roman,

In general, the use of inconsistent types is discouraged but there is
little that you can do on your end.

I think your approach with SourceFunction is good but I'd probably not use
Row already but rather some POJO or source format record. Note, that I have
never seen side-outputs in a source, so please check if it's working
properly. If not, you can probably do the same with a chained map with
almost no overhead. Then you'd probably need to use an intermediate data
type that is the union of the different schemas. So if you have Person and
Purchase records intermixed, you'd use a PersonOrPurchase intermediate type.

Then, to convert to a Table, please check the docs on how to map the data
types [1].

I'm assuming it is also possible to directly work with a Row although I
haven't done that. But note that in general you cannot provide the
TypeInformation dynamically, it has to be known when you convert to a
Table. In that case, it might be easier to just have a POJO for each
possible type.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#mapping-of-data-types-to-table-schema



On Tue, Feb 9, 2021 at 5:32 PM Roman Karlstetter <
roman.karlstet...@gmail.com> wrote:

> Hi everyone,
>
> I want to connect to a proprietary data stream, which sends different
> types of messages (maybe interpreted as a table), intertwined in the
> stream. Every type of message (or table) can have a different schema, but
> for each type this schema is known when connecting (i.e., at runtime) and
> does not change.
>
> I'm new to flink, so I have a few (stupid?) questions about this use case.
> I have created a custom SourceFunction which produces Rows read from this
> data stream. Then I use side outputs to split up this stream into multiple
> DataStream[Row]. Is this the right way to do it?
> What's the best way to add custom TypeInformation[Row] to each of those
> streams, so that I can easily map this to a table which can be accessed via
> the Table API? Or would I rather directly implement a ScanTableSource (I
> played with this, the SourceFunction approach was easier though)? I believe
> that Row is the best way to create this kind of schema at runtime, or is
> there a better alternative?
>
> Kind regards
> Roman
>
>
>
>
>
>


Custom source with multiple, differently typed outputs

2021-02-09 Thread Roman Karlstetter
Hi everyone,

I want to connect to a proprietary data stream, which sends different types
of messages (maybe interpreted as a table), intertwined in the stream.
Every type of message (or table) can have a different schema, but for each
type this schema is known when connecting (i.e., at runtime) and does not
change.

I'm new to flink, so I have a few (stupid?) questions about this use case.
I have created a custom SourceFunction which produces Rows read from this
data stream. Then I use side outputs to split up this stream into multiple
DataStream[Row]. Is this the right way to do it?
What's the best way to add custom TypeInformation[Row] to each of those
streams, so that I can easily map this to a table which can be accessed via
the Table API? Or would I rather directly implement a ScanTableSource (I
played with this, the SourceFunction approach was easier though)? I believe
that Row is the best way to create this kind of schema at runtime, or is
there a better alternative?

Kind regards
Roman


Re: Collect output of transformations on a custom source in real time

2016-05-26 Thread Stephan Ewen
Hi!

I am not sure I understand the problem exactly, but one problem I see in
your code is that you call "execute()" on and then "DataStreamUtils.collect(
datastream);"

The first call to "env.execute()" will start the program (source and
filter) and the results will simply go nowhere.
Then you call "DataStreamUtils.collect(datastream);", which internally
calls "execute" again.

In short: remote the first call to "env.execute()", that should do the
trick.

Stephan


On Thu, May 26, 2016 at 5:09 PM, Ahmed Nader <ahmednader...@gmail.com>
wrote:

> Hello,
> I have defined a custom source function for an infinite stream source,
> where in my overwritten run method I have a while true loop to keep
> listening for the input. I want to apply some transformations on the
> resulting datastream from my source and collect the output so far of these
> transformations in a collection.
> However when i leave my source running in an infinite loop, nothing is
> really executed.
> Here are some parts of my code to clarify more:
>
> my custom source class:
> public class FeedSource implements SourceFunction
>
> The run method in this class has a while(boolean variable == true)
>
> Then I call my source and apply filter on it:
> datastream = env.addSource(new FeedSource()).filter();
>
> then execute:
> env.execute();
>
> I want then to collect my datastream in a collection:
> Iterator iter = DataStreamUtils.collect(datastream);
>
> So is it possible to first of all apply filter on my stream that way? And
> then If I'm able to do so, is it possible to keep updating my collection
> with the content in my datastream so far?
>
> I hope I was able to make my question clear enough.
> Thanks,
> Ahmed
>
>
>


Collect output of transformations on a custom source in real time

2016-05-26 Thread Ahmed Nader
Hello,
I have defined a custom source function for an infinite stream source,
where in my overwritten run method I have a while true loop to keep
listening for the input. I want to apply some transformations on the
resulting datastream from my source and collect the output so far of these
transformations in a collection.
However when i leave my source running in an infinite loop, nothing is
really executed.
Here are some parts of my code to clarify more:

my custom source class:
public class FeedSource implements SourceFunction

The run method in this class has a while(boolean variable == true)

Then I call my source and apply filter on it:
datastream = env.addSource(new FeedSource()).filter();

then execute:
env.execute();

I want then to collect my datastream in a collection:
Iterator iter = DataStreamUtils.collect(datastream);

So is it possible to first of all apply filter on my stream that way? And
then If I'm able to do so, is it possible to keep updating my collection
with the content in my datastream so far?

I hope I was able to make my question clear enough.
Thanks,
Ahmed


Re: Custom Source Function for reading JSON?

2016-05-15 Thread iñaki williams
Hi!

I have the class, I want to create objects using the values extraced from
the JSON text, my fault haha.

Sorry for that

2016-05-15 19:10 GMT+02:00 Gábor Horváth :

> Hi!
>
> On 14 May 2016 at 23:47, iñaki williams  wrote:
>
>> Hi Flink Community!
>>
>> I am new using Apache Flink and I have a problem reading a JSON.
>>
>> I am using a JSON from a webpage, this JSON is changing continuosly so I
>> decided to use my own Source Function in order to grab the JSON using a URL
>> that is always the same: http://www.example/blabla/this.json
>>
>> I have been looking the other Source Functions from Github, but I am not
>> able to read it. I want to take the JSON, parse it and transform it into a
>> java class with some of the attributes and values that I need.
>>
>
> Sorry, but this part is not clear to me. Do you plan to create new classes
> at runtime and instantiate objects, or you do have the class at compile
> time and you want to create objects using the values extraced from the JSON
> text?
>
>
>>
>> When I compare other examples, they use SourceContext with String, but I
>> dont really want to send an String. I got stuck in that part and I am
>> looking for the best way of doing this.
>>
>> Thanks for your attention!
>>
>> Iñaki
>>
>>
> Regards,
> Gábor
>
>


Re: Custom Source Function for reading JSON?

2016-05-15 Thread Gábor Horváth
Hi!

On 14 May 2016 at 23:47, iñaki williams  wrote:

> Hi Flink Community!
>
> I am new using Apache Flink and I have a problem reading a JSON.
>
> I am using a JSON from a webpage, this JSON is changing continuosly so I
> decided to use my own Source Function in order to grab the JSON using a URL
> that is always the same: http://www.example/blabla/this.json
>
> I have been looking the other Source Functions from Github, but I am not
> able to read it. I want to take the JSON, parse it and transform it into a
> java class with some of the attributes and values that I need.
>

Sorry, but this part is not clear to me. Do you plan to create new classes
at runtime and instantiate objects, or you do have the class at compile
time and you want to create objects using the values extraced from the JSON
text?


>
> When I compare other examples, they use SourceContext with String, but I
> dont really want to send an String. I got stuck in that part and I am
> looking for the best way of doing this.
>
> Thanks for your attention!
>
> Iñaki
>
>
Regards,
Gábor