Thank you Fabian.

Fabian, Gordon, am I missing something in consumer setup?
Should I configure consumer in some way to subscribe to two partitions?

Thanks and regards,
Tovi

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: יום ג 19 ספטמבר 2017 22:58
To: Sofer, Tovi [ICG-IT]
Cc: user; Tzu-Li (Gordon) Tai
Subject: Re: Flink kafka consumer that read from two partitions in local mode

Hi Tovi,
your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong.
Just a side note: you don't need to set the parallelism to 2 to read from two 
partitions. A single consumer instance reads can read from multiple partitions.
Best,
Fabian

2017-09-19 17:02 GMT+02:00 Sofer, Tovi 
<tovi.so...@citi.com<mailto:tovi.so...@citi.com>>:
Hi,

I am trying to setup FlinkKafkaConsumer which reads from two partitions in 
local mode, using  setParallelism=2.
The producer writes to two partition (as it is shown in metrics report).
But the consumer seems to read always from one partition only.
Am I missing something in partition configuration?

Code:


Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);

env.setParallelism(2);

String kafkaPort = 
parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());

SingleOutputStreamOperator<String> fixMsgSource = 
env.addSource(srcMsgProvider.getFixMsgSource(), 
TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, 
TOPIC_NAME, new SimpleStringSchema()))

.name(“fix_topic”);

env.execute(“MsgSimulatorJob”);


Consumer setup:

String topicName = “fix”;
Configuration conf = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters 
available in the web interface
DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new 
SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new 
FlinkKafkaConsumer010<>(topicName, deserializationSchema, 
kafkaParams.getProperties());

DataStream<Tuple2<Long, String>> fixMessagesStream = 
env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);

As you can see in output, only 1 consumer partition seems to be used:
Producer output:
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.1.numRecordsInPerSecond: 19836.033333333333
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.0.numRecordsInPerSecond: 20337.933333333334
2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
Consumer output:
2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0


Thanks and regards,
Tovi

Reply via email to