Hi Mahima, looks like you found the relevant parts of the code already: In JarHandlerUtils.JarHandlerContext#fromRequest, the parallelism value is extracted from the request body or query parameter (the latter is deprecated, though). If none is found, it defaults to 1 and overwrites the configured value of 3 in applyToConfiguration. >From what I could gather, this is the case for legacy reasons (backwards compatibility). I am afraid when executing a job by running a jar that was previously uploaded via the jar upload, default parallelism from the config will not be considered.
Best regards, Nico On Wed, Jul 7, 2021 at 8:18 AM Mahima Agarwal <mahi.29agar...@gmail.com> wrote: > Hi Team, > > Please find the query below. > > Use Case: Using parallelism.default property mentioned in flink-conf.yaml > file to enable system-level parallelism in flink configuration. > > Issue: Even after setting the parallelism.default to 3, on config start > the configuration starts with parallelism as 1. > > On debugging the code we found that the value of parallelism.default in > Configuration object instantiated inside handleRequest() method of > JarRunHandler class(Line Number - 90) is initially set to 3 but it is > changed to 1 in applyToConfiguration method of > JarHandlerUtils.JarHandlerContext class(Line Number - 132) which is called > from handleRequest method of JarRunHandler(Line Number - 95). > > Flink Version - 1.12.1 > Job Code - > > public class FlinkJob > { > > public static void main(String[] args) throws Exception > { > > String TOPIC_IN = args[0]; > String TOPIC_OUT = args[1]; > String BOOTSTRAP_SERVER = args[2]; > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > Properties props = new Properties(); > props.put("bootstrap.servers", BOOTSTRAP_SERVER); > props.put("group.id","kc1"); > > KafkaDeserializationSchema<Object> deserializationSchema = new > MMDeserializer<>("org.gamezone.GameZoneInput"); > > FlinkKafkaConsumer<Object> kafkaConsumer = new > FlinkKafkaConsumer<>(TOPIC_IN, deserializationSchema, props); > kafkaConsumer.setStartFromLatest(); > > Properties prodProps = new Properties(); > prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER); > > FlinkKafkaProducer<Object> kafkaProducer = > new FlinkKafkaProducer<>(TOPIC_OUT, > ((value, timestamp) -> new > ProducerRecord<>(TOPIC_OUT, "myKey".getBytes(), > value.toString().getBytes())), > prodProps, > Semantic.AT_LEAST_ONCE); > > DataStream<Object> stream = env.addSource(kafkaConsumer); > KeySelector<Object,Serializable> keys = create(); > KeyedStream<Object, Serializable> playerId = stream.keyBy(keys); > > playerId.process(new KeyedAggregateFunction(), > TypeInformation.of(new TypeHint<Object>() > {})).addSink(kafkaProducer); > > env.execute(); > } > > public static KeySelector<Object, Serializable> create() > { > return record -> > { > final Serializable key = ((GameZoneInput)record).getPlayerId(); > return key; > }; > } > > Any leads would be appreciated. > > Thanks > Mahima >