Hi Tovi, Your code seems to be correct, and as Fabian described, you don’t need parallelism of 2 to read 2 partitions; a single parallel instance of the source can read multiple partitions.
I’m not sure what could have possibly gone wrong at the moment from a first look, so I may need to randomly ask you some questions: Could you let me know which Flink version you are on? Also, could you try searching in the log to see if you find consumer logs such as: “Consumer subtask ... will start reading the following (numPartitions) partitions from ...: (partition info) " You can try setting parallelism of the source to 1, and you should see that the subtask is reading 2 partitions. From the metrics log it does seem like the consumer has picked up both partitions 0 and 1, but no records seem to be coming from partition 0. Have you perhaps tried using a non-Flink consumer, perhaps the simple console consumer, to read the topic, and see if records from both partitions are consumed properly? Let me know, I’m sure we can figure this out somehow. Cheers, Gordon On 24 September 2017 at 9:44:28 AM, Sofer, Tovi (tovi.so...@citi.com) wrote: Thank you Fabian. Fabian, Gordon, am I missing something in consumer setup? Should I configure consumer in some way to subscribe to two partitions? Thanks and regards, Tovi From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: יום ג 19 ספטמבר 2017 22:58 To: Sofer, Tovi [ICG-IT] Cc: user; Tzu-Li (Gordon) Tai Subject: Re: Flink kafka consumer that read from two partitions in local mode Hi Tovi, your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong. Just a side note: you don't need to set the parallelism to 2 to read from two partitions. A single consumer instance reads can read from multiple partitions. Best, Fabian 2017-09-19 17:02 GMT+02:00 Sofer, Tovi <tovi.so...@citi.com>: Hi, I am trying to setup FlinkKafkaConsumer which reads from two partitions in local mode, using setParallelism=2. The producer writes to two partition (as it is shown in metrics report). But the consumer seems to read always from one partition only. Am I missing something in partition configuration? Code: Producer setup: Configuration localConfig = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig); env.setParallelism(2); String kafkaPort = parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName()); SingleOutputStreamOperator<String> fixMsgSource = env.addSource(srcMsgProvider.getFixMsgSource(), TypeInformation.of(String.class)).name(sourceGenerationType.getValue()); fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:" + kafkaPort, TOPIC_NAME, new SimpleStringSchema())) .name(“fix_topic”); env.execute(“MsgSimulatorJob”); Consumer setup: String topicName = “fix”; Configuration conf = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(2); env.getConfig().setGlobalJobParameters(configParams); // make parameters available in the web interface DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new SimpleStringAndTimestampDeserializationSchema (); FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(topicName, deserializationSchema, kafkaParams.getProperties()); DataStream<Tuple2<Long, String>> fixMessagesStream = env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2); As you can see in output, only 1 consumer partition seems to be used: Producer output: 2017-09-19 14:40:45,818 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0 2017-09-19 14:40:45,818 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.1.numRecordsInPerSecond: 19836.033333333333 2017-09-19 14:40:45,818 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.0.numRecordsInPerSecond: 20337.933333333334 2017-09-19 14:40:45,819 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0 Consumer output: 2017-09-19 14:40:45,116 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0 Thanks and regards, Tovi