Thanks very much, given that I'm using SQL it doesn't look like I'm able to access the operators to be able to change the parallelism without dropping from the table API into the datastream API and then back again. In any case, conversion between a tablesteam and a datastream is broken if the tablestream contains a timestamp, which I reported a little while ago and Dian filed as FLINK-28253.
Kind regards, John ________________________________ From: Juntao Hu <maybach...@gmail.com> Sent: 18 July 2022 04:13 To: John Tipper <john_tip...@hotmail.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: PyFlink and parallelism It's not the issue with Python-Java object conversion, you get a DataStream rather than SingleOutputStreamOperator underlying the Python DataStream wrapper after calling `to_data_stream`, and `setParallelism` is only available on SingleOutputStreamOperator. To work around this, change `set_parallelism` to your processing operator, e.g. `filtered_stream.map(...).set_parallelism`. John Tipper <john_tip...@hotmail.com<mailto:john_tip...@hotmail.com>> 于2022年7月16日周六 17:07写道: I've tried this and can see there appears to be a bigger problem with PyFlink and a call to set_parallelism(): events_table = table_env.from_path(MY_SOURCE_TABLE) filtered_table = events_table.filter( col("event_type") == "event_of_interest" ) filtered_stream = table_env.to_data_stream(filtered_table) # fetch desired parallelism from config, don't hardcode filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism', 1))) table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream) # now execute SQL onMY_FILTERED_VIEW table_env.execute_sql(...) I now get an error: An error occurred when calling o68.setParallelism(). Trace: org.apache.flink.api.python.shaded.py4j.Py4JException: Method setParallelism([class java.lang.Integer]) does not exist. Looks like Python is converting to the Integer object in Java and not the int primitive. I actually see this if I just call set_parallelism(1) without the call to get_config(). Is this a bug or is there a workaround? ________________________________ From: John Tipper Sent: 15 July 2022 16:44 To: user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: PyFlink and parallelism Hi all, I have a processing topology using PyFlink and SQL where there is data skew: I'm splitting a stream of heterogenous data into separate streams based on the type of data that's in it and some of these substreams have very many more events than others and this is causing issues when checkpointing (checkpoints are timing out). I'd like to increase parallelism for these problematic streams, I'm just not sure how I do that and target just those elements. Do I need to use the datastream API here? What does this look like please? I have a table defined and I duplicate a stream from that table, then filter so that my substream has only the events I'm interested in: events_table = table_env.from_path(MY_SOURCE_TABLE) filtered_table = events_table.filter( col("event_type") == "event_of_interest" ) table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table) # now execute SQL on MY_FILTERED_VIEW table_env.execute_sql(...) The default parallelism of the overall table env is 1. Is there a way to increase the parallelism for just this stream? Many thanks, John