Hi Rick,
You can limit your Spark processing by passing the following option to your
beam pipeline:
*MaxRecordsPerBatch*
see
https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/runners/spark/SparkPipelineOptions.html#getMaxRecordsPerBatch--
Hope it helps.
JC
On Mon, Jan 28, 2019 at 10:57 AM <[email protected]> wrote:
> Dear Raghu,
>
>
>
> I add the line: “PCollection<Integer> reshuffled =
> windowKV.apply(Reshuffle.viaRandomKey());” in my program.
>
>
>
> I tried to control the streaming data size: 100,000/1sec to decrease the
> processing time.
>
>
>
> The following settings are used for my project.
>
>
>
> 1. One topic / 2 partitions
>
> 2. Two workers / two executors
>
>
>
> 3. The spark-default setting is:
>
> spark.executor.instances=2
>
> spark.executor.cores=4
>
> spark.executor.memory=2048m
>
> spark.default.parallelism=200
>
>
>
> spark.streaming.blockInterval=50ms
>
> spark.streaming.kafka.maxRatePerPartition=50,000
>
> spark.streaming.backpressure.enabled=true
>
> spark.streaming.concurrentJobs = 1
>
> spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
>
> spark.executor.extraJavaOptions=-Xss100M
>
>
>
> spark.shuffle.consolidateFiles=true
>
> spark.streaming.unpersist=true
>
> spark.streaming.stopGracefullyOnShutdown=true
>
>
>
> I hope that the data size is controlled at 100,000.
>
>
>
> Here,
>
>
>
> The data size is always over 100,000. The setting of
> “spark.streaming.kafka.maxRatePerPartition” confused me.
>
>
>
> That does not seem to work for me.
>
>
>
> Rick
>
>
>
> *From:* Raghu Angadi [mailto:[email protected]]
> *Sent:* Saturday, January 26, 2019 3:06 AM
> *To:* [email protected]
> *Subject:* Re: kafkaIO Consumer Rebalance with Spark Runner
>
>
>
> You have 32 partitions. Reading can not be distributed to more than 32
> parallel tasks.
>
> If you have a log of processing for each record after reading, you can
> reshuffle the messages before processing them, that way the processing
> could be distributed to more tasks. Search for previous threads about
> reshuffle in Beam lists.
>
>
>
> On Thu, Jan 24, 2019 at 7:23 PM <[email protected]> wrote:
>
> Dear all,
>
>
>
> I am using the kafkaIO sdk in my project (Beam with Spark runner).
>
>
>
> The problem about task skew is shown as the following figure.
>
>
>
> My running environment is:
>
> OS: Ubuntn 14.04.4 LTS
>
> The version of related tools is:
>
> java version: "1.8.0_151"
>
> Beam version: 2.9.0 (Spark runner with Standalone mode)
>
> Spark version: 2.3.1 Standalone mode
>
> Execution condition:
>
> Master/Driver node: ubuntu7
>
> Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
>
> The number of executors is 8
>
>
>
> Kafka Broker: 2.10-0.10.1.1
>
> Broker node at ubuntu7
>
> Kafka Client:
>
> The topic: kafkasink32
>
> kafkasink32 Partitions: 32
>
>
>
> The programming of my project for kafkaIO SDK is as:
>
>
> ==============================================================================
>
> Map<String, Object> map = ImmutableMap.<String, Object>*builder*()
>
> .put("group.id", (Object)"test-consumer-group")
>
> .build();
>
> List<TopicPartition> topicPartitions = *new** ArrayList()*;
>
> *for*(*int* i = 0; i < 32; i++) {
>
> topicPartitions.add(*new* TopicPartition(
> "kafkasink32",i));
>
> }
>
> PCollection<KV<Long, String>> readKafkaData = p.apply(KafkaIO.<Long,
> String>*read*()
>
> .withBootstrapServers("ubuntu7:9092")
>
> .updateConsumerProperties(map)
>
> .withKeyDeserializer(LongDeserializer.*class*)
>
> .withValueDeserializer(StringDeserializer.*class*)
>
> .withTopicPartitions(topicPartitions)
>
> .withoutMetadata()
>
> );
>
>
> ==============================================================================
>
> Here I have two directions to solve this problem:
>
>
>
> 1. Using the following sdk from spark streaming
>
>
> https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
>
> LocationStrategies.PreferConsistent: Use in most cases as it consistently
> distributes partitions across all executors.
>
>
>
> If we would like to use this feature, we have not idea to set this in
> kafkaIO SDK.
>
>
>
> 2. Setting the related configurations of kafka to perform the
> consumer rebalance
>
> set consumer group? Set group.id?
>
>
>
> If we need to do No2., could someone give me some ideas to set
> configurations?
>
>
>
> If anyone provides any direction to help us to overcome this problem, we
> would appreciate it.
>
>
>
> Thanks.
>
>
>
> Sincerely yours,
>
>
>
> Rick
>
>
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>
--
JC