Application mode - Custom Flink docker image with Python user code

2021-10-26 Thread Sumeet Malhotra
Hi,

I'm currently submitting my Python user code from my local machine to a
Flink cluster running in Session mode on Kubernetes. For this, I have a
custom Flink image with Python as per this reference [1].

Now, I'd like to move to using the Application mode with Native Kubernetes,
where the user code is embedded within the container image. For Java, the
process is described here [2]. This basically requires the user application
jar to be copied over to $FLINK_HOME/usrlib directory. I couldn't find any
documentation on where should Python user code live? Can it be packaged at
any location and invoked?

Thanks,
Sumeet


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode


Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Sumeet Malhotra
I'm using a standalone deployment on Kubernetes for this use case. Does the
archive get uploaded to the cluster via the :8081 REST/WebUI port or via
some other port like 6123/RPC or 6124/BLOB-SERVER? I'm wondering if not
exposing those ports on the local machine might prevent the archive from
getting loaded? Although I would have expected an explicit error in that
case.

NAMESPACE NAME   TYPE   PORTS
flink flink-jobmanager   ClusterIP  rpc:6123►0
blob-server:6124►0 webui:8081►0

Thanks,
Sumeet


On Fri, Jun 11, 2021 at 2:48 PM Roman Khachatryan  wrote:

> Hi Sumeet,
>
> Probably there is an issue with uploading the archive while submitting the
> job.
> The commands and API usage look good to me.
> Dian could you please confirm that?
>
> Regards,
> Roman
>
> On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
>  wrote:
> >
> > Thank you Roman. Yes, that's what I am going to do.
> >
> > But I'm running into another issue... when I specify the --pyArchives
> option on the command line, the job never gets submitted and is stuck
> forever. And when I try to programmatically do this by calling
> add_python_archive(), the job gets submitted but fails because the target
> directory is not found on the UDF node. Flink is deployed on a K8S cluster
> in my case and the port 8081 is forwarded to the localhost.
> >
> > Here's the command line I use:
> >
> > ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python
> my_job.py  --pyArchives file:///path/to/schema.zip#schema
> >
> > And within the UDF I'm access the schema file as:
> >
> > read_schema('schema/my_schema.json')
> >
> > Or if I try using the API instead of the command-line, the call looks as:
> >
> > env = StreamExecutionEnvironment.get_execution_environment()
> > env.add_python_archive('schema.zip', 'schema')
> >
> > Initially, my_job.py itself had its own command line options, and I was
> thinking that might interfere with the overall Flink command line options,
> but even after removing that I'm not able to submit the job anymore.
> However, if I don't use the --pyArchives option and manually transfer the
> schema file to a location on the UDF node, the job gets submitted and works
> as expected.
> >
> > Any reason why this might happen?
> >
> > Thanks,
> > Sumeet
> >
> >
> > On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> I think the second option is what you need. The documentation says
> >> only zip format is supported.
> >> Alternatively, you could upload the files to S3 or other DFS and
> >> access from TMs and re-upload when needed.
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
> >>
> >> Regards,
> >> Roman
> >>
> >> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
> >>  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I'm using UDTFs in PyFlink, that depend upon a few resource files
> (JSON schema files actually). The path of this file can be passed into the
> UDTF, but essentially this path needs to exist on the Task Manager node
> where the task executes. What's the best way to upload these resource
> files? As of now, my custom Flink image creates a fixed path with the
> required resource files, but I'd like it to be run time configurable.
> >> >
> >> > There are 2 APIs available to load files when submitting a PyFlink
> job...
> >> >
> >> > stream_execution_environment.add_python_file() - Recommended to
> upload files (.py etc) but doesn't let me configure the final path on the
> target node. The files are added to PYTHONPATH, but it needs the UDTF
> function to lookup for this file. I'd like to pass the file location into
> the UDTF instead.
> >> >
> >> > stream_execution_environment.add_python_archive() - Appears to be
> more generic, in the sense that it allows a target directory to be
> specified. The documentation doesn't say anything about the contents of the
> archive, so I'm guessing it could be any type of file. Is this what is
> needed for my use case?
> >> >
> >> > Or is there any other recommended way to upload non-Python
> dependencies/resources?
> >> >
> >> > Thanks in advance,
> >> > Sumeet
>


Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Sumeet Malhotra
Thank you Roman. Yes, that's what I am going to do.

But I'm running into another issue... when I specify the *--pyArchives*
option on the command line, the job never gets submitted and is stuck
forever. And when I try to programmatically do this by calling
*add_python_archive()*, the job gets submitted but fails because the target
directory is not found on the UDF node. Flink is deployed on a K8S cluster
in my case and the port 8081 is forwarded to the localhost.

Here's the command line I use:

~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python
my_job.py  --pyArchives file:///path/to/schema.zip#schema

And within the UDF I'm access the schema file as:

read_schema('schema/my_schema.json')

Or if I try using the API instead of the command-line, the call looks as:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_python_archive('schema.zip', 'schema')

Initially, my_job.py itself had its own command line options, and I was
thinking that might interfere with the overall Flink command line options,
but even after removing that I'm not able to submit the job anymore.
However, if I don't use the --pyArchives option and manually transfer the
schema file to a location on the UDF node, the job gets submitted and works
as expected.

Any reason why this might happen?

Thanks,
Sumeet


On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan  wrote:

> Hi,
>
> I think the second option is what you need. The documentation says
> only zip format is supported.
> Alternatively, you could upload the files to S3 or other DFS and
> access from TMs and re-upload when needed.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
>
> Regards,
> Roman
>
> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
>  wrote:
> >
> > Hi,
> >
> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON
> schema files actually). The path of this file can be passed into the UDTF,
> but essentially this path needs to exist on the Task Manager node where the
> task executes. What's the best way to upload these resource files? As of
> now, my custom Flink image creates a fixed path with the required resource
> files, but I'd like it to be run time configurable.
> >
> > There are 2 APIs available to load files when submitting a PyFlink job...
> >
> > stream_execution_environment.add_python_file() - Recommended to upload
> files (.py etc) but doesn't let me configure the final path on the target
> node. The files are added to PYTHONPATH, but it needs the UDTF function to
> lookup for this file. I'd like to pass the file location into the UDTF
> instead.
> >
> > stream_execution_environment.add_python_archive() - Appears to be more
> generic, in the sense that it allows a target directory to be specified.
> The documentation doesn't say anything about the contents of the archive,
> so I'm guessing it could be any type of file. Is this what is needed for my
> use case?
> >
> > Or is there any other recommended way to upload non-Python
> dependencies/resources?
> >
> > Thanks in advance,
> > Sumeet
>


PyFlink: Upload resource files to Flink cluster

2021-06-09 Thread Sumeet Malhotra
Hi,

I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON
schema files actually). The path of this file can be passed into the UDTF,
but essentially this path needs to exist on the Task Manager node where the
task executes. What's the best way to upload these resource files? As of
now, my custom Flink image creates a fixed path with the required resource
files, but I'd like it to be run time configurable.

There are 2 APIs available to load files when submitting a PyFlink job...

*stream_execution_environment.add_python_file()* - Recommended to upload
files (.py etc) but doesn't let me configure the final path on the target
node. The files are added to PYTHONPATH, but it needs the UDTF function to
lookup for this file. I'd like to pass the file location into the UDTF
instead.

*stream_execution_environment.add_python_archive()* - Appears to be more
generic, in the sense that it allows a target directory to be specified.
The documentation doesn't say anything about the contents of the archive,
so I'm guessing it could be any type of file. Is this what is needed for my
use case?

Or is there any other recommended way to upload non-Python
dependencies/resources?

Thanks in advance,
Sumeet


Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
Thanks Xingbo! The workaround will probably work for now, at least it
avoids having to refer to index values in the rest of the function.

Cheers,
Sumeet


On Wed, May 19, 2021 at 3:02 PM Xingbo Huang  wrote:

> Hi Sumeet,
>
> Due to the limitation of the original PyFlink serializers design, there is
> no way to pass attribute names to Row in row-based operations. In
> release-1.14, I am reconstructing the implementations of serializers[1].
> After completion, accessing attribute names of `Row` in row-based
> operations will be supported[2].
>
> About the work around way in releases-1.13, maybe you need to manually set
> the field_names of Row. e.g.
> ```
> def my_table_tranform_fn(x: Row):
> x.set_field_names(['a', 'b', 'c'])
> ...
> ```
>
> [1] https://issues.apache.org/jira/browse/FLINK-22612
> [2] https://issues.apache.org/jira/browse/FLINK-22712
>
> Best,
> Xingbo
>
> Sumeet Malhotra  于2021年5月19日周三 下午4:45写道:
>
>> Hi,
>>
>> According to the documentation for PyFlink Table row based operations
>> [1], typical usage is as follows:
>>
>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
>> def split(x: Row) -> Row:
>> for s in x[1].split(","):
>> yield x[0], s
>>
>> table.flat_map(split)
>>
>> Is there any way that row fields inside the UDTF can be accessed by
>> their attribute names instead of array index? In my use case, I'm doing the
>> following:
>>
>> raw_data = t_env.from_path('MySource')
>> raw_data \
>> .join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
>> .flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
>> .execute_insert("MySink")
>>
>> In the table function `my_flat_map_fn` I'm unable to access the fields
>> of the row by their attribute names i.e., assuming the input argument to
>> the table function is x, I cannot access fields as x.a, x.b or x.c, instead
>> I have use use x[0], x[1] and x[2]. The error I get is the _fields is not
>> populated.
>>
>> In my use case, the number of columns is very high and working with
>> indexes is so much error prone and unmaintainable.
>>
>> Any suggestions?
>>
>> Thanks,
>> Sumeet
>>
>>


Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
Hi,

According to the documentation for PyFlink Table row based operations [1],
typical usage is as follows:

@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
for s in x[1].split(","):
yield x[0], s

table.flat_map(split)

Is there any way that row fields inside the UDTF can be accessed by
their attribute names instead of array index? In my use case, I'm doing the
following:

raw_data = t_env.from_path('MySource')
raw_data \
.join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
.flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
.execute_insert("MySink")

In the table function `my_flat_map_fn` I'm unable to access the fields of
the row by their attribute names i.e., assuming the input argument to the
table function is x, I cannot access fields as x.a, x.b or x.c, instead I
have use use x[0], x[1] and x[2]. The error I get is the _fields is not
populated.

In my use case, the number of columns is very high and working with indexes
is so much error prone and unmaintainable.

Any suggestions?

Thanks,
Sumeet


PyFlink: Split input table stream using filter()

2021-05-05 Thread Sumeet Malhotra
Hi,

I would like to split streamed data from Kafka into 2 streams based on some
filter criteria, using PyFlink Table API. As described here [1], a way to
do this is to use .filter() which should split the stream for parallel
processing.

Does this approach work in Table API as well? I'm doing the following, but
control never reaches the second stream.

input = t_env.from_path('TableName')
stream1 = input.filter().select(...)...
stream2 = input.filter().select(...)...

When I execute this, I only see the first stream getting processed. Control
never reaches stream2. I have set parallelism to 2.

Am I missing something? Or is this only supported in Datastreams?

Thanks in advance,
Sumeet

[1]:
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream


Re: Is keyed state supported in PyFlink?

2021-05-05 Thread Sumeet Malhotra
Thanks Dian. Yes, I hadn't looked at the 1.13.0 documentation earlier.

On Wed, May 5, 2021 at 1:46 PM Dian Fu  wrote:

> Hi Sumeet,
>
> This feature is supported in 1.13.0 which was just released and so there
> is no documentation about it in 1.12.
>
> Regards,
> Dian
>
> 2021年5月4日 上午2:09,Sumeet Malhotra  写道:
>
> Hi,
>
> Is keyed state [1] supported by PyFlink yet? I can see some code for it in
> the Flink master branch, but there's no mention of it in the 1.12 Python
> documentation.
>
> Thanks,
> Sumeet
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
>
>
>


Define rowtime on intermediate table field

2021-05-04 Thread Sumeet Malhotra
Hi,

My use case involves reading raw data records from Kafka and processing
them. The records are coming from a database, where a periodic job reads
new rows, packages them into a single JSON object (as described below) and
writes the entire record to Kafka.

{
'id': 'some_id',
'key_a': 'value_a',
'key_b': 'value_b',
'result': {
'columns': [
'col_a',
'col_b',
'col_c',
'col_d'
],
'rows': [
['2021-05-04T05:23:13.953610Z', '655361', '8013', '0'],
['2021-05-04T05:23:13.953610Z', '655362', '4000', '456'],
['2021-05-04T05:23:13.953610Z', '655363', '2', '562'],
...
...
]
}
}

As can be seen, the row time is actually embedded in the `result` object.

What I'm doing at the moment is to run this data through a user defined
table function, which parses the `result` object as a string, and emits
multiple rows that include the timestamp field. This is working fine.

In the next step, I would want to perform windowing on this transformed
data. That requires defining the event time attribute along with the
watermark. As I understand, this can be done either during the initial
table DDL definition or during conversion to a datastream.

Since I extract the timestamp value only after reading from Kafka, how can
I define an event time attribute on the intermediate table that's basically
a result of the user defined table function?

The only solution I can think of at the moment, is to write the
intermediate table back to Kafka, and then create a new consumer that reads
from Kafka, where I can define the event time attribute as part of its DDL.
This most likely won't be good performance wise. I'm looking at any other
way, I can define event time on results of my user defined table function?

Thanks in advance,
Sumeet


Is keyed state supported in PyFlink?

2021-05-03 Thread Sumeet Malhotra
Hi,

Is keyed state [1] supported by PyFlink yet? I can see some code for it in
the Flink master branch, but there's no mention of it in the 1.12 Python
documentation.

Thanks,
Sumeet

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html


Re: Best practice for packaging and deploying Flink jobs on K8S

2021-05-02 Thread Sumeet Malhotra
Thanks for updating the documentation Dian. Appreciate it.

..Sumeet

On Sun, May 2, 2021 at 10:53 AM Dian Fu  wrote:

> Hi Sumeet,
>
> FYI: the documentation about the CLI options of PyFlink has already been
> updated [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs
>
> Regards,
> Dian
>
>
> On Thu, Apr 29, 2021 at 4:46 PM Dian Fu  wrote:
>
>> Hi Sumeet,
>>
>> For the Python dependencies, multiple ways have been provided to specify
>> them and you could take either way of them.
>>
>> Regarding to requirements.txt, there are 3 ways provided and you could
>> specify it via either of them:
>> - API inside the code: set_python_requirements
>> - command line option: -pyreq [1]
>> - configuration: python.requirements
>>
>> So you don’t need to specify them both inside the code and the command
>> line options.
>>
>> PS: It seems that -pyreq is missing from the latest CLI documentation,
>> however, actually it’s there and you could refer to the 1.11 documentation
>> for now. I’ll try to add it back ASAP.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html
>>
>> Regards,
>> Dian
>>
>> 2021年4月29日 下午3:24,Sumeet Malhotra  写道:
>>
>> Hi Till,
>>
>> There’s no problem with the documented approach. I was looking if there
>> were any standardized ways of organizing, packaging and deploying Python
>> code on a Flink cluster.
>>
>> Thanks,
>> Sumeet
>>
>>
>>
>> On Thu, Apr 29, 2021 at 12:37 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Sumeet,
>>>
>>> Is there a problem with the documented approaches on how to submit the
>>> Python program (not working) or are you asking in general? Given the
>>> documentation, I would assume that you can configure the requirements.txt
>>> via `set_python_requirements`.
>>>
>>> I am also pulling in Dian who might be able to tell you more about the
>>> Python deployment options.
>>>
>>> If you are not running on a session cluster, then you can also create a
>>> K8s image which contains your user code. That way you ship your job when
>>> deploying the cluster.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 28, 2021 at 10:17 AM Sumeet Malhotra <
>>> sumeet.malho...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a PyFlink job that consists of:
>>>>
>>>>- Multiple Python files.
>>>>- Multiple 3rdparty Python dependencies, specified in a
>>>>`requirements.txt` file.
>>>>- A few Java dependencies, mainly for external connectors.
>>>>- An overall job config YAML file.
>>>>
>>>> Here's a simplified structure of the code layout.
>>>>
>>>> flink/
>>>> ├── deps
>>>> │   ├── jar
>>>> │   │   ├── flink-connector-kafka_2.11-1.12.2.jar
>>>> │   │   └── kafka-clients-2.4.1.jar
>>>> │   └── pip
>>>> │   └── requirements.txt
>>>> ├── conf
>>>> │   └── job.yaml
>>>> └── job
>>>> ├── some_file_x.py
>>>> ├── some_file_y.py
>>>> └── main.py
>>>>
>>>> I'm able to execute this job running it locally i.e. invoking something
>>>> like:
>>>>
>>>> python main.py --config 
>>>>
>>>> I'm loading the jars inside the Python code, using env.add_jars(...).
>>>>
>>>> Now, the next step is to submit this job to a Flink cluster running on
>>>> K8S. I'm looking for any best practices in packaging and specifying
>>>> dependencies that people tend to follow. As per the documentation here [1],
>>>> various Python files, including the conf YAML, can be specified using the
>>>> --pyFiles option and Java dependencies can be specified using --jarfile
>>>> option.
>>>>
>>>> So, how can I specify 3rdparty Python package dependencies? According
>>>> to another piece of documentation here [2], I should be able to specify the
>>>> requirements.txt directly inside the code and submit it via the --pyFiles
>>>> option. Is that right?
>>>>
>>>> Are there any other best practices folks use to package/submit jobs?
>>>>
>>>> Thanks,
>>>> Sumeet
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program
>>>>
>>>
>>


Re: Best practice for packaging and deploying Flink jobs on K8S

2021-04-29 Thread Sumeet Malhotra
Hi Till,

There’s no problem with the documented approach. I was looking if there
were any standardized ways of organizing, packaging and deploying Python
code on a Flink cluster.

Thanks,
Sumeet



On Thu, Apr 29, 2021 at 12:37 PM Till Rohrmann  wrote:

> Hi Sumeet,
>
> Is there a problem with the documented approaches on how to submit the
> Python program (not working) or are you asking in general? Given the
> documentation, I would assume that you can configure the requirements.txt
> via `set_python_requirements`.
>
> I am also pulling in Dian who might be able to tell you more about the
> Python deployment options.
>
> If you are not running on a session cluster, then you can also create a
> K8s image which contains your user code. That way you ship your job when
> deploying the cluster.
>
> Cheers,
> Till
>
> On Wed, Apr 28, 2021 at 10:17 AM Sumeet Malhotra <
> sumeet.malho...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a PyFlink job that consists of:
>>
>>- Multiple Python files.
>>- Multiple 3rdparty Python dependencies, specified in a
>>`requirements.txt` file.
>>- A few Java dependencies, mainly for external connectors.
>>- An overall job config YAML file.
>>
>> Here's a simplified structure of the code layout.
>>
>> flink/
>> ├── deps
>> │   ├── jar
>> │   │   ├── flink-connector-kafka_2.11-1.12.2.jar
>> │   │   └── kafka-clients-2.4.1.jar
>> │   └── pip
>> │   └── requirements.txt
>> ├── conf
>> │   └── job.yaml
>> └── job
>> ├── some_file_x.py
>> ├── some_file_y.py
>> └── main.py
>>
>> I'm able to execute this job running it locally i.e. invoking something
>> like:
>>
>> python main.py --config 
>>
>> I'm loading the jars inside the Python code, using env.add_jars(...).
>>
>> Now, the next step is to submit this job to a Flink cluster running on
>> K8S. I'm looking for any best practices in packaging and specifying
>> dependencies that people tend to follow. As per the documentation here [1],
>> various Python files, including the conf YAML, can be specified using the
>> --pyFiles option and Java dependencies can be specified using --jarfile
>> option.
>>
>> So, how can I specify 3rdparty Python package dependencies? According to
>> another piece of documentation here [2], I should be able to specify the
>> requirements.txt directly inside the code and submit it via the --pyFiles
>> option. Is that right?
>>
>> Are there any other best practices folks use to package/submit jobs?
>>
>> Thanks,
>> Sumeet
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program
>>
>


Best practice for packaging and deploying Flink jobs on K8S

2021-04-28 Thread Sumeet Malhotra
Hi,

I have a PyFlink job that consists of:

   - Multiple Python files.
   - Multiple 3rdparty Python dependencies, specified in a
   `requirements.txt` file.
   - A few Java dependencies, mainly for external connectors.
   - An overall job config YAML file.

Here's a simplified structure of the code layout.

flink/
├── deps
│   ├── jar
│   │   ├── flink-connector-kafka_2.11-1.12.2.jar
│   │   └── kafka-clients-2.4.1.jar
│   └── pip
│   └── requirements.txt
├── conf
│   └── job.yaml
└── job
├── some_file_x.py
├── some_file_y.py
└── main.py

I'm able to execute this job running it locally i.e. invoking something
like:

python main.py --config 

I'm loading the jars inside the Python code, using env.add_jars(...).

Now, the next step is to submit this job to a Flink cluster running on K8S.
I'm looking for any best practices in packaging and specifying dependencies
that people tend to follow. As per the documentation here [1], various
Python files, including the conf YAML, can be specified using the --pyFiles
option and Java dependencies can be specified using --jarfile option.

So, how can I specify 3rdparty Python package dependencies? According to
another piece of documentation here [2], I should be able to specify the
requirements.txt directly inside the code and submit it via the --pyFiles
option. Is that right?

Are there any other best practices folks use to package/submit jobs?

Thanks,
Sumeet

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program


Re: Accessing columns from input stream table during Window operations

2021-04-20 Thread Sumeet Malhotra
Thanks Dian, Guowei. I think it makes sense to roll with this approach.

On Tue, Apr 20, 2021 at 8:29 AM Guowei Ma  wrote:

> Hi, Sumeet
> Thanks you for the sharing. As Dian suggested, I think you could use b as
> your `group_by`'s key and so the b could be output directly.
> I think it is more simple.
> Best,
> Guowei
>
>
> On Mon, Apr 19, 2021 at 7:31 PM Dian Fu  wrote:
>
>> Hi Sumeet,
>>
>> Thanks for the sharing.
>>
>> Then I guess you could use `.group_by(col('w'), input.a, input.b)`.
>> Since the value for input.a is always the same, it’s equal to group_by(
>> col(‘w'), input.b) logically. The benefit is that you could access
>> input.a directly in the select clause.
>>
>> Regards,
>> Dian
>>
>> 2021年4月19日 下午6:29,Sumeet Malhotra  写道:
>>
>> Hi Guowei,
>>
>> Let me elaborate the use case with an example.
>>
>> Sample input table looks like this:
>>
>> timea   b   c
>> -
>> t0  a0  b0  1
>> t1  a0  b1  2
>> t2  a0  b2  3
>> t3  a0  b0  6
>> t4  a0  b1  7
>> t5  a0  b2  8
>>
>> Basically, every time interval there are new readings from a fixed set of
>> sensors (b0, b1 and b2). All these rows have a few constant fields
>> representing metadata about the input (a0).
>>
>> Desired output for every time interval is the average reading for every
>> sensor (b0, b1, b2), along with the constant metadata (a0):
>>
>> a0b0avg(c)
>> a0b1avg(c)
>> a0b2avg(c)
>>
>> This is what I was trying to build using a simple Tumble window:
>>
>> input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w"))
>> \
>> .group_by(col('w'), input.b) \
>> .select(
>> input.a,<=== constant metadata field,
>> same for every input record
>> input.b,<=== group_by field, to
>> compute averages
>> input.c.avg.alias('avg_value')) \
>> .execute_insert('MySink') \
>> .wait()
>>
>> The example above is highly simplified, but I hope it explains what I'm
>> trying to achieve.
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Mon, Apr 19, 2021 at 3:21 PM Dian Fu  wrote:
>>
>>> Hi Sumeet,
>>>
>>> 1) Regarding to the above exception, it’s a known issue and has been
>>> fixed in FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> 
>>> [1]. It
>>> will be available in the coming 1.12.3. You could also cherry-pick that fix
>>> to 1.12.2 and build from source following the instruction described in [2]
>>> if needed.
>>>
>>> 2) Regarding to your requirements, could you describe what you want to
>>> do with group window or over window?
>>> For group window(e.g. tumble window, hop window, session window, etc),
>>> it will output one row for multiple inputs belonging to the same window.
>>> You could not just passing through it from input to sink as it is
>>> non-determinitic which row to use as there are multiple input rows. That’s
>>> the reason why you have to declare a field in the group by clause if you
>>> want to access it directly in the select clause. For over window, it will
>>> output one row for each input and so you could pass through it directly.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-21922.
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink
>>>
>>>
>>> 2021年4月19日 下午5:16,Sumeet Malhotra  写道:
>>>
>>> Thanks Guowei. I'm trying out Over Windows, as follows:
>>>
>>> input \
>>> .over_window(
>>> Over.partition_by(col(input.a)) \
>>> .order_by(input.Timestamp) \
>>> .preceding(lit(10).seconds) \
>>> .alias('w')) \
>>> .select(
>>> input.b,
>>> input.c.avg.over(col('w'))) \
>>> .execute_insert('MySink') \
>>> .wait()
>>>
>>> But running into following exception:
>>>
>>> py4j.protocol.Py4JError: An error occurred while calling
>>> z:org.apache.flink.table.api.Over.partitionBy. Trace:
>>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
>>> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist
>>>
>>> Is there any extra Jar that needs to be included for Ov

Re: Accessing columns from input stream table during Window operations

2021-04-19 Thread Sumeet Malhotra
Hi Guowei,

Let me elaborate the use case with an example.

Sample input table looks like this:

timea   b   c
-
t0  a0  b0  1
t1  a0  b1  2
t2  a0  b2  3
t3  a0  b0  6
t4  a0  b1  7
t5  a0  b2  8

Basically, every time interval there are new readings from a fixed set of
sensors (b0, b1 and b2). All these rows have a few constant fields
representing metadata about the input (a0).

Desired output for every time interval is the average reading for every
sensor (b0, b1, b2), along with the constant metadata (a0):

a0b0avg(c)
a0b1avg(c)
a0b2avg(c)

This is what I was trying to build using a simple Tumble window:

input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \

.group_by(col('w'), input.b) \

.select(

input.a,<=== constant metadata field,
same for every input record

input.b,<=== group_by field, to compute
averages

input.c.avg.alias('avg_value')) \

.execute_insert('MySink') \

.wait()

The example above is highly simplified, but I hope it explains what I'm
trying to achieve.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 3:21 PM Dian Fu  wrote:

> Hi Sumeet,
>
> 1) Regarding to the above exception, it’s a known issue and has been fixed
> in FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> [1]. It
> will be available in the coming 1.12.3. You could also cherry-pick that fix
> to 1.12.2 and build from source following the instruction described in [2]
> if needed.
>
> 2) Regarding to your requirements, could you describe what you want to do
> with group window or over window?
> For group window(e.g. tumble window, hop window, session window, etc), it
> will output one row for multiple inputs belonging to the same window. You
> could not just passing through it from input to sink as it is
> non-determinitic which row to use as there are multiple input rows. That’s
> the reason why you have to declare a field in the group by clause if you
> want to access it directly in the select clause. For over window, it will
> output one row for each input and so you could pass through it directly.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21922.
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink
>
>
> 2021年4月19日 下午5:16,Sumeet Malhotra  写道:
>
> Thanks Guowei. I'm trying out Over Windows, as follows:
>
> input \
> .over_window(
> Over.partition_by(col(input.a)) \
> .order_by(input.Timestamp) \
> .preceding(lit(10).seconds) \
> .alias('w')) \
> .select(
> input.b,
> input.c.avg.over(col('w'))) \
> .execute_insert('MySink') \
> .wait()
>
> But running into following exception:
>
> py4j.protocol.Py4JError: An error occurred while calling
> z:org.apache.flink.table.api.Over.partitionBy. Trace:
> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist
>
> Is there any extra Jar that needs to be included for Over Windows. From
> the code it doesn't appear so.
>
> Thanks,
> Sumeet
>
>
> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma  wrote:
>
>> Hi, Sumeet
>>
>> Maybe you could try the Over Windows[1], which could keep the
>> "non-group-key" column.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <
>> sumeet.malho...@gmail.com> wrote:
>>
>>> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause
>>> any issues. It's only when I want to use "input.b".
>>>
>>> My use case is to basically emit "input.b" in the final sink as is, and
>>> not really perform any aggregation on that column - more like pass through
>>> from input to sink. What's the best way to achieve this? I was thinking
>>> that making it part of the select() clause would do it, but as you said
>>> there needs to be some aggregation performed on it.
>>>
>>> Thanks,
>>> Sumeet
>>>
>>>
>>> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma  wrote:
>>>
>>>> Hi, Sumeet
>>>>   For "input.b" I think you should aggregate the non-group-key
>>>> column[1].
>>>> But I am not sure why the "input.c.avg.alias('avg_value')"  has
>>>> resolved errors. Would you mind giving more detailed error informa

Re: Accessing columns from input stream table during Window operations

2021-04-19 Thread Sumeet Malhotra
Thanks Guowei. I'm trying out Over Windows, as follows:

input \
.over_window(
Over.partition_by(col(input.a)) \
.order_by(input.Timestamp) \
.preceding(lit(10).seconds) \
.alias('w')) \
.select(
input.b,
input.c.avg.over(col('w'))) \
.execute_insert('MySink') \
.wait()

But running into following exception:

py4j.protocol.Py4JError: An error occurred while calling
z:org.apache.flink.table.api.Over.partitionBy. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method
partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist

Is there any extra Jar that needs to be included for Over Windows. From the
code it doesn't appear so.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma  wrote:

> Hi, Sumeet
>
> Maybe you could try the Over Windows[1], which could keep the
> "non-group-key" column.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows
>
> Best,
> Guowei
>
>
> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra 
> wrote:
>
>> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause
>> any issues. It's only when I want to use "input.b".
>>
>> My use case is to basically emit "input.b" in the final sink as is, and
>> not really perform any aggregation on that column - more like pass through
>> from input to sink. What's the best way to achieve this? I was thinking
>> that making it part of the select() clause would do it, but as you said
>> there needs to be some aggregation performed on it.
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma  wrote:
>>
>>> Hi, Sumeet
>>>   For "input.b" I think you should aggregate the non-group-key
>>> column[1].
>>> But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved
>>> errors. Would you mind giving more detailed error information?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <
>>> sumeet.malho...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a use case where I'm creating a Tumbling window as follows:
>>>>
>>>> "input" table has columns [Timestamp, a, b, c]
>>>>
>>>> input \
>>>>
>>>> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
>>>> .group_by(col('w'), input.a) \
>>>> .select(
>>>> col('w').start.alias('window_start'),
>>>> col('w').end.alias('window_end'),
>>>> input.b,
>>>> input.c.avg.alias('avg_value')) \
>>>> .execute_insert('MySink') \
>>>> .wait()
>>>>
>>>> This throws an exception that it cannot resolve the fields "b" and "c"
>>>> inside the select statement. If I mention these column names inside the
>>>> group_by() statement as follows:
>>>>
>>>> .group_by(col('w'), input.a, input.b, input.c)
>>>>
>>>> then the column names in the subsequent select statement can be
>>>> resolved.
>>>>
>>>> Basically, unless the column name is explicitly made part of the
>>>> group_by() clause, the subsequent select() clause doesn't resolve it. This
>>>> is very similar to the example from Flink's documentation here [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples,
>>>> where a similar procedure works.
>>>>
>>>> Any idea how I can access columns from the input stream, without having
>>>> to mention them in the group_by() clause? I really don't want to group the
>>>> results by those fields, but they need to be written to the sink 
>>>> eventually.
>>>>
>>>> Thanks,
>>>> Sumeet
>>>>
>>>


Re: Accessing columns from input stream table during Window operations

2021-04-19 Thread Sumeet Malhotra
Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any
issues. It's only when I want to use "input.b".

My use case is to basically emit "input.b" in the final sink as is, and not
really perform any aggregation on that column - more like pass through from
input to sink. What's the best way to achieve this? I was thinking that
making it part of the select() clause would do it, but as you said there
needs to be some aggregation performed on it.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma  wrote:

> Hi, Sumeet
>   For "input.b" I think you should aggregate the non-group-key
> column[1].
> But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved
> errors. Would you mind giving more detailed error information?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows
>
> Best,
> Guowei
>
>
> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra 
> wrote:
>
>> Hi,
>>
>> I have a use case where I'm creating a Tumbling window as follows:
>>
>> "input" table has columns [Timestamp, a, b, c]
>>
>> input \
>> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
>> .group_by(col('w'), input.a) \
>> .select(
>> col('w').start.alias('window_start'),
>> col('w').end.alias('window_end'),
>> input.b,
>> input.c.avg.alias('avg_value')) \
>> .execute_insert('MySink') \
>> .wait()
>>
>> This throws an exception that it cannot resolve the fields "b" and "c"
>> inside the select statement. If I mention these column names inside the
>> group_by() statement as follows:
>>
>> .group_by(col('w'), input.a, input.b, input.c)
>>
>> then the column names in the subsequent select statement can be resolved.
>>
>> Basically, unless the column name is explicitly made part of the
>> group_by() clause, the subsequent select() clause doesn't resolve it. This
>> is very similar to the example from Flink's documentation here [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples,
>> where a similar procedure works.
>>
>> Any idea how I can access columns from the input stream, without having
>> to mention them in the group_by() clause? I really don't want to group the
>> results by those fields, but they need to be written to the sink eventually.
>>
>> Thanks,
>> Sumeet
>>
>


Accessing columns from input stream table during Window operations

2021-04-19 Thread Sumeet Malhotra
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
.group_by(col('w'), input.a) \
.select(
col('w').start.alias('window_start'),
col('w').end.alias('window_end'),
input.b,
input.c.avg.alias('avg_value')) \
.execute_insert('MySink') \
.wait()

This throws an exception that it cannot resolve the fields "b" and "c"
inside the select statement. If I mention these column names inside the
group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by()
clause, the subsequent select() clause doesn't resolve it. This is very
similar to the example from Flink's documentation here [1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples,
where a similar procedure works.

Any idea how I can access columns from the input stream, without having to
mention them in the group_by() clause? I really don't want to group the
results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet


Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
Additional observation: From the Flink repo, the file
"flink-python/pyflink/table/table.py" seems to support map(), flat_map()
and other row based operations although the 1.12 documentation doesn't
reflect that. Is that correct? From the code, it appears that these
operations are supported in Python.

Thanks,
Sumeet

On Thu, Apr 15, 2021 at 6:31 AM Sumeet Malhotra 
wrote:

> Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly
> Table APIs. The documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations)
> suggests that Map() function is not currently supported in Python. So, what
> do you think would be my options here. Should I convert to a data stream to
> perform this in Python?
>
> Thanks again,
> Sumeet
>
>
> On Wed, Apr 14, 2021 at 7:09 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> One thing that you can do is to read this record using Avro keeping
>> `Result` as `bytes` and in a subsequent mapping function, you could change
>> the record type and deserialize the result. In Data Stream API:
>>
>> source.map(new MapFunction> record_with_deserialized_result> { ...} )
>>
>> Best,
>> Piotrek
>>
>> śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
>> napisał(a):
>>
>>> Hi,
>>>
>>> I'm reading data from Kafka, which is Avro encoded and has the following
>>> general schema:
>>>
>>> {
>>>   "name": "SomeName",
>>>   "doc": "Avro schema with variable embedded encodings",
>>>   "type": "record",
>>>   "fields": [
>>> {
>>>   "name": "Name",
>>>   "doc": "My name",
>>>   "type": "string"
>>> },
>>> {
>>>   "name": "ID",
>>>   "doc": "My ID",
>>>   "type": "string"
>>> },
>>> {
>>>   "name": "Result",
>>>   "doc": "Result data, could be encoded differently",
>>>   "type": "bytes"
>>> },
>>> {
>>>   "name": "ResultEncoding",
>>>   "doc": "Result encoding media type (e.g. application/avro,
>>> application/json)",
>>>   "type": "string"
>>> },
>>>   ]
>>> }
>>>
>>> Basically, the "Result" field is bytes whose interpretation depends upon
>>> the "ResultEncoding" field i.e. either avro or json. The "Result" byte
>>> stream has its own well defined schema also.
>>>
>>> My use case involves extracting/aggregating data from within the
>>> embedded "Result" field. What would be the best approach to perform this
>>> runtime decoding and extraction of fields from the embedded byte data?
>>> Would user defined functions help in this case?
>>>
>>> Thanks in advance!
>>> Sumeet
>>>
>>>


Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly Table
APIs. The documentation (
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations)
suggests that Map() function is not currently supported in Python. So, what
do you think would be my options here. Should I convert to a data stream to
perform this in Python?

Thanks again,
Sumeet


On Wed, Apr 14, 2021 at 7:09 PM Piotr Nowojski  wrote:

> Hi,
>
> One thing that you can do is to read this record using Avro keeping
> `Result` as `bytes` and in a subsequent mapping function, you could change
> the record type and deserialize the result. In Data Stream API:
>
> source.map(new MapFunction record_with_deserialized_result> { ...} )
>
> Best,
> Piotrek
>
> śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
> napisał(a):
>
>> Hi,
>>
>> I'm reading data from Kafka, which is Avro encoded and has the following
>> general schema:
>>
>> {
>>   "name": "SomeName",
>>   "doc": "Avro schema with variable embedded encodings",
>>   "type": "record",
>>   "fields": [
>> {
>>   "name": "Name",
>>   "doc": "My name",
>>   "type": "string"
>> },
>> {
>>   "name": "ID",
>>   "doc": "My ID",
>>   "type": "string"
>> },
>> {
>>   "name": "Result",
>>   "doc": "Result data, could be encoded differently",
>>   "type": "bytes"
>> },
>> {
>>   "name": "ResultEncoding",
>>   "doc": "Result encoding media type (e.g. application/avro,
>> application/json)",
>>   "type": "string"
>> },
>>   ]
>> }
>>
>> Basically, the "Result" field is bytes whose interpretation depends upon
>> the "ResultEncoding" field i.e. either avro or json. The "Result" byte
>> stream has its own well defined schema also.
>>
>> My use case involves extracting/aggregating data from within the embedded
>> "Result" field. What would be the best approach to perform this runtime
>> decoding and extraction of fields from the embedded byte data? Would user
>> defined functions help in this case?
>>
>> Thanks in advance!
>> Sumeet
>>
>>


Extract/Interpret embedded byte data from a record

2021-04-13 Thread Sumeet Malhotra
Hi,

I'm reading data from Kafka, which is Avro encoded and has the following
general schema:

{
  "name": "SomeName",
  "doc": "Avro schema with variable embedded encodings",
  "type": "record",
  "fields": [
{
  "name": "Name",
  "doc": "My name",
  "type": "string"
},
{
  "name": "ID",
  "doc": "My ID",
  "type": "string"
},
{
  "name": "Result",
  "doc": "Result data, could be encoded differently",
  "type": "bytes"
},
{
  "name": "ResultEncoding",
  "doc": "Result encoding media type (e.g. application/avro,
application/json)",
  "type": "string"
},
  ]
}

Basically, the "Result" field is bytes whose interpretation depends upon
the "ResultEncoding" field i.e. either avro or json. The "Result" byte
stream has its own well defined schema also.

My use case involves extracting/aggregating data from within the embedded
"Result" field. What would be the best approach to perform this runtime
decoding and extraction of fields from the embedded byte data? Would user
defined functions help in this case?

Thanks in advance!
Sumeet


Re: Avro schema

2021-04-13 Thread Sumeet Malhotra
Hi Arvid,

I certainly appreciate the points you make regarding schema evolution.
Actually, I did end up writing an avro2sql script to autogen the DDL in the
end.

Thanks,
Sumeet

On Fri, Apr 9, 2021 at 12:13 PM Arvid Heise  wrote:

> Hi Sumeet,
>
> The beauty of Avro lies in having reader and writer schema and schema
> compatibility, such that if your schema evolves over time (which will
> happen in streaming naturally but is also very common in batch), you can
> still use your application as is without modification. For streaming, this
> methodology also implies that you can process elements with different
> schema versions in the same run, which is mandatory for any non-toy example.
>
> If you read into this topic, you will realize that it doesn't make sense
> to read from Avro without specifying your reader schema (except for some
> generic applications, but they should be written in DataStream). If you
> keep in mind that your same dataset could have different schemas, you will
> notice that your ideas quickly reach some limitations (which schema to
> take?). What you could do, is to write a small script to generate the
> schema DDL from your current schema in your actual data if you have very
> many columns and datasets. It certainly would also be an interesting idea
> to pass a static Avro/Json schema to the DDL.
>
> On Fri, Apr 2, 2021 at 10:57 AM Paul Lam  wrote:
>
>> Hi Sumeet,
>>
>> I’m not a Table/SQL API expert, but from my knowledge, it’s not viable to
>> derived SQL table schemas from Avro schemas, because table schemas would be
>> the ground truth by design.
>> Moreover, one Avro type can be mapped to multiple Flink types, so in
>> practice maybe it’s also not viable.
>>
>> Best,
>> Paul Lam
>>
>> 2021年4月2日 11:34,Sumeet Malhotra  写道:
>>
>> Just realized, my question was probably not clear enough. :-)
>>
>> I understand that the Avro (or JSON for that matter) format can be
>> ingested as described here:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#apache-avro-format,
>> but this still requires the entire table specification to be written in the
>> "CREATE TABLE" section. Is it possible to just specify the Avro schema and
>> let Flink map it to an SQL table?
>>
>> BTW, the above link is titled "Table API Legacy Connectors", so is this
>> still supported? Same question for YAML specification.
>>
>> Thanks,
>> Sumeet
>>
>> On Fri, Apr 2, 2021 at 8:26 AM Sumeet Malhotra 
>> wrote:
>>
>>> Hi,
>>>
>>> Is it possible to directly import Avro schema while ingesting data into
>>> Flink? Or do we always have to specify the entire schema in either SQL DDL
>>> for Table API or using DataStream data types? From a code maintenance
>>> standpoint, it would be really helpful to keep one source of truth for the
>>> schema somewhere.
>>>
>>> Thanks,
>>> Sumeet
>>>
>>
>>


Re: Avro schema

2021-04-01 Thread Sumeet Malhotra
Just realized, my question was probably not clear enough. :-)

I understand that the Avro (or JSON for that matter) format can be ingested
as described here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#apache-avro-format,
but this still requires the entire table specification to be written in the
"CREATE TABLE" section. Is it possible to just specify the Avro schema and
let Flink map it to an SQL table?

BTW, the above link is titled "Table API Legacy Connectors", so is this
still supported? Same question for YAML specification.

Thanks,
Sumeet

On Fri, Apr 2, 2021 at 8:26 AM Sumeet Malhotra 
wrote:

> Hi,
>
> Is it possible to directly import Avro schema while ingesting data into
> Flink? Or do we always have to specify the entire schema in either SQL DDL
> for Table API or using DataStream data types? From a code maintenance
> standpoint, it would be really helpful to keep one source of truth for the
> schema somewhere.
>
> Thanks,
> Sumeet
>


Avro schema

2021-04-01 Thread Sumeet Malhotra
Hi,

Is it possible to directly import Avro schema while ingesting data into
Flink? Or do we always have to specify the entire schema in either SQL DDL
for Table API or using DataStream data types? From a code maintenance
standpoint, it would be really helpful to keep one source of truth for the
schema somewhere.

Thanks,
Sumeet


PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Sumeet Malhotra
Cross posting from StackOverlow here:

https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array

Any pointers are appreciated!

Thanks,
Sumeet


Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-31 Thread Sumeet Malhotra
Thanks Dawid. This looks like what I needed :-)

On Tue, Mar 30, 2021 at 12:28 PM Dawid Wysakowicz 
wrote:

> Hey,
>
> I am not sure which format you use, but if you work with JSON maybe this
> option[1] could help you.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> On 30/03/2021 06:45, Sumeet Malhotra wrote:
>
> Thanks. Yes, that's a possibility. I'd still prefer something that can be
> done within the Table API. If it's not possible, then there's no other
> option but to use the DataStream API to read from Kafka, do the time
> conversion and create a table from it.
>
> ..Sumeet
>
> On Mon, Mar 29, 2021 at 10:41 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I hope someone else might have a better answer, but one thing that would
>> most likely work is to convert this field and define even time during
>> DataStream to table conversion [1]. You could always pre process this field
>> in the DataStream API.
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
>>
>> pon., 29 mar 2021 o 18:07 Sumeet Malhotra 
>> napisał(a):
>>
>>> Hi,
>>>
>>> Might be a simple, stupid question, but I'm not able to find how to
>>> convert/interpret a UTC datetime string like
>>> *2021-03-23T07:37:00.613910Z* as event-time using a DDL/Table API. I'm
>>> ingesting data from Kafka and can read this field as a string, but would
>>> like to mark it as event-time by defining a watermark.
>>>
>>> I'm able to achieve this using the DataStream API, by defining my own
>>> TimestampAssigner that converts the datetime string to milliseconds since
>>> epoch. How can I do this using a SQL DDL or Table API?
>>>
>>> I tried to directly interpret the string as TIMESTAMP(3) but it fails
>>> with the following exception:
>>>
>>> java.time.format.DateTimeParseException: Text
>>> '2021-03-23T07:37:00.613910Z' could not be parsed...
>>>
>>> Any pointers?
>>>
>>> Thanks!
>>> Sumeet
>>>
>>>


Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-29 Thread Sumeet Malhotra
Thanks. Yes, that's a possibility. I'd still prefer something that can be
done within the Table API. If it's not possible, then there's no other
option but to use the DataStream API to read from Kafka, do the time
conversion and create a table from it.

..Sumeet

On Mon, Mar 29, 2021 at 10:41 PM Piotr Nowojski 
wrote:

> Hi,
>
> I hope someone else might have a better answer, but one thing that would
> most likely work is to convert this field and define even time during
> DataStream to table conversion [1]. You could always pre process this field
> in the DataStream API.
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
>
> pon., 29 mar 2021 o 18:07 Sumeet Malhotra 
> napisał(a):
>
>> Hi,
>>
>> Might be a simple, stupid question, but I'm not able to find how to
>> convert/interpret a UTC datetime string like
>> *2021-03-23T07:37:00.613910Z* as event-time using a DDL/Table API. I'm
>> ingesting data from Kafka and can read this field as a string, but would
>> like to mark it as event-time by defining a watermark.
>>
>> I'm able to achieve this using the DataStream API, by defining my own
>> TimestampAssigner that converts the datetime string to milliseconds since
>> epoch. How can I do this using a SQL DDL or Table API?
>>
>> I tried to directly interpret the string as TIMESTAMP(3) but it fails
>> with the following exception:
>>
>> java.time.format.DateTimeParseException: Text
>> '2021-03-23T07:37:00.613910Z' could not be parsed...
>>
>> Any pointers?
>>
>> Thanks!
>> Sumeet
>>
>>


PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-29 Thread Sumeet Malhotra
Hi,

Might be a simple, stupid question, but I'm not able to find how to
convert/interpret a UTC datetime string like *2021-03-23T07:37:00.613910Z*
as event-time using a DDL/Table API. I'm ingesting data from Kafka and can
read this field as a string, but would like to mark it as event-time by
defining a watermark.

I'm able to achieve this using the DataStream API, by defining my own
TimestampAssigner that converts the datetime string to milliseconds since
epoch. How can I do this using a SQL DDL or Table API?

I tried to directly interpret the string as TIMESTAMP(3) but it fails with
the following exception:

java.time.format.DateTimeParseException: Text '2021-03-23T07:37:00.613910Z'
could not be parsed...

Any pointers?

Thanks!
Sumeet


Re: Recommended way to split datastream in PyFlink

2021-03-22 Thread Sumeet Malhotra
Apologies. I meant `StreamTableEnvironment.to_append_stream` in my last
message.

On Mon, Mar 22, 2021 at 2:03 PM Sumeet Malhotra 
wrote:

> Thanks Dian.
>
> Another question I have is, since PyFlink Datastream API still doesn't
> have native Window support, what's the recommended way to introduce
> windows? Use PyFlink Table API for windows in conjunction with the
> Datastream APIs? For example, read input records from Kafka into a table
> and then use `StreamTableEnvironment.to_apend_table(...)` to push records
> into a data stream?
>
> Thanks,
> Sumeet
>
>
> On Mon, Mar 22, 2021 at 1:05 PM Dian Fu  wrote:
>
>> Hi Sumeet,
>>
>> It still doesn't support side outputs in PyFlink.
>>
>> >> Or do I have to replicate the input datastream and then apply record
>> specific filters?
>> I'm afraid that yes.
>>
>> Regards,
>> Dian
>>
>>
>> On Sun, Mar 21, 2021 at 5:20 PM Sumeet Malhotra <
>> sumeet.malho...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a use case where I need to process incoming records on a Kafka
>>> topic based on a certain record field that defines the record type.
>>>
>>> What I'm thinking is to split the incoming datastream into record-type
>>> specific streams and then apply record-type specific stream processing on
>>> each. What's the recommended way to achieve this in PyFlink? Are side
>>> outputs supported in PyFlink (I couldn't find any reference in the
>>> codebase)? Or do I have to replicate the input datastream and then apply
>>> record specific filters?
>>>
>>> Thanks,
>>> Sumeet
>>>
>>


Re: Recommended way to split datastream in PyFlink

2021-03-22 Thread Sumeet Malhotra
Thanks Dian.

Another question I have is, since PyFlink Datastream API still doesn't have
native Window support, what's the recommended way to introduce windows? Use
PyFlink Table API for windows in conjunction with the Datastream APIs? For
example, read input records from Kafka into a table and then use
`StreamTableEnvironment.to_apend_table(...)` to push records into a data
stream?

Thanks,
Sumeet


On Mon, Mar 22, 2021 at 1:05 PM Dian Fu  wrote:

> Hi Sumeet,
>
> It still doesn't support side outputs in PyFlink.
>
> >> Or do I have to replicate the input datastream and then apply record
> specific filters?
> I'm afraid that yes.
>
> Regards,
> Dian
>
>
> On Sun, Mar 21, 2021 at 5:20 PM Sumeet Malhotra 
> wrote:
>
>> Hi,
>>
>> I have a use case where I need to process incoming records on a Kafka
>> topic based on a certain record field that defines the record type.
>>
>> What I'm thinking is to split the incoming datastream into record-type
>> specific streams and then apply record-type specific stream processing on
>> each. What's the recommended way to achieve this in PyFlink? Are side
>> outputs supported in PyFlink (I couldn't find any reference in the
>> codebase)? Or do I have to replicate the input datastream and then apply
>> record specific filters?
>>
>> Thanks,
>> Sumeet
>>
>


Recommended way to split datastream in PyFlink

2021-03-21 Thread Sumeet Malhotra
Hi,

I have a use case where I need to process incoming records on a Kafka topic
based on a certain record field that defines the record type.

What I'm thinking is to split the incoming datastream into record-type
specific streams and then apply record-type specific stream processing on
each. What's the recommended way to achieve this in PyFlink? Are side
outputs supported in PyFlink (I couldn't find any reference in the
codebase)? Or do I have to replicate the input datastream and then apply
record specific filters?

Thanks,
Sumeet