Hi Mahima,

looks like you found the relevant parts of the code already: In
JarHandlerUtils.JarHandlerContext#fromRequest, the parallelism value is
extracted from the request body or query parameter (the latter is
deprecated, though). If none is found, it defaults to 1 and overwrites the
configured value of 3 in applyToConfiguration.
>From what I could gather, this is the case for legacy reasons (backwards
compatibility). I am afraid when executing a job by running a jar that was
previously uploaded via the jar upload, default parallelism from the config
will not be considered.

Best regards,
Nico

On Wed, Jul 7, 2021 at 8:18 AM Mahima Agarwal <mahi.29agar...@gmail.com>
wrote:

> Hi Team,
>
> Please find the query below.
>
> Use Case: Using parallelism.default property mentioned in flink-conf.yaml
> file to enable system-level parallelism in flink configuration.
>
> Issue: Even after setting the parallelism.default to 3, on config start
> the configuration starts with parallelism as 1.
>
> On debugging the code we found that the value of parallelism.default in
> Configuration object instantiated inside handleRequest() method of
> JarRunHandler class(Line Number - 90) is initially set to 3 but it is
> changed to 1 in applyToConfiguration method of
> JarHandlerUtils.JarHandlerContext class(Line Number - 132) which is called
> from handleRequest method of JarRunHandler(Line Number - 95).
>
> Flink Version - 1.12.1
> Job Code -
>
> public class FlinkJob
> {
>
>     public static void main(String[] args) throws Exception
>     {
>
>         String TOPIC_IN = args[0];
>         String TOPIC_OUT = args[1];
>         String BOOTSTRAP_SERVER = args[2];
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         Properties props = new Properties();
>         props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>         props.put("group.id","kc1");
>
>         KafkaDeserializationSchema<Object> deserializationSchema = new
> MMDeserializer<>("org.gamezone.GameZoneInput");
>
>         FlinkKafkaConsumer<Object> kafkaConsumer = new
> FlinkKafkaConsumer<>(TOPIC_IN, deserializationSchema, props);
>         kafkaConsumer.setStartFromLatest();
>
>         Properties prodProps = new Properties();
>         prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER);
>
>         FlinkKafkaProducer<Object> kafkaProducer =
>                 new FlinkKafkaProducer<>(TOPIC_OUT,
>                         ((value, timestamp) -> new
> ProducerRecord<>(TOPIC_OUT, "myKey".getBytes(),
> value.toString().getBytes())),
>                         prodProps,
>                         Semantic.AT_LEAST_ONCE);
>
>         DataStream<Object> stream = env.addSource(kafkaConsumer);
>         KeySelector<Object,Serializable> keys = create();
>         KeyedStream<Object, Serializable> playerId = stream.keyBy(keys);
>
>         playerId.process(new KeyedAggregateFunction(),
> TypeInformation.of(new TypeHint<Object>()
>         {})).addSink(kafkaProducer);
>
>         env.execute();
>     }
>
>     public static KeySelector<Object, Serializable> create()
>     {
>         return record ->
>         {
>             final Serializable key = ((GameZoneInput)record).getPlayerId();
>             return key;
>         };
>     }
>
> Any leads would be appreciated.
>
> Thanks
> Mahima
>

Reply via email to