Hi Flink Community, I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more. I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective. I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.
1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take. 2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows. 3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key? I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome. You can find a condensed version of the source code attached. Kind Regards, Niklas ############################################################# from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True) @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()], result_type=DataTypes.FLOAT(), udf_type='pandas') def forcast(ds_float_series, y): # Train the model and create the forcast yhat_ts = forcast['yhat'].tail(input_size) return yhat_ts t_env.register_function("forcast", forcast) # Define sink and source here t_env.execute_sql(my_source_ddl) t_env.execute_sql(my_sink_ddl) # TODO: key_by instead of filter t_env.from_path('mySource') \ .where("riid === 'r1i1'") \ .select("ds, riid, y, forcast(ds, y) as yhat_90d") \ .insert_into('mySink') t_env.execute("pandas_udf_demo") #############################################################
smime.p7s
Description: S/MIME cryptographic signature