Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. 
The goal is to train models in parallel for independent time series in the same 
data stream. For that purpose I'm using a Python library, which lead me to 
PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device 
identifier. After that I need to process the data for each device all at once. 
There is no way to iteratively train the model unfortunately. The challenge I'm 
facing is to guarantee that all data belonging to a certain device is processed 
in one single step. I'm aware of the fact that this does not scale well, but 
for a reasonable amount of input data per device it should be fine from my 
perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which 
roughly fulfil my requirements, but there are the following limitations left, 
which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my 
purpose, but as far as I can see there is no way to guarantee that the chunk of 
data in the Series is "complete". Flink will slice the Series and maybe call 
the UDF multiple times for each device. As far as I can see there are some 
config options like "python.fn-execution.arrow.batch.size" and 
"python.fn-execution.bundle.time", which might help, but I'm not sure, whether 
this is the right path to take.

2. The length of the input Series needs to be of the same size as the output 
Series, which isn't nice for my use case. What I would like to do is to process 
n rows and emit m rows. There shouldn't be any dependency between the number of 
input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this 
doesn't serve my purpose, because I don't want to aggregate all the grouped 
lines. Instead as stated above I want to emit m result lines per group. Are 
there other options using the Table API or any other API to do this kind of 
grouping. I would need something like a "keyBy()" from the streaming API. Maybe 
this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table 
API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
 True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):
    
    # Train the model and create the forcast

    yhat_ts = forcast['yhat'].tail(input_size)
    return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
    .where("riid === 'r1i1'") \
    .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
    .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################


Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to