Hi Rick,
Can you try to remove
options.setMaxRecordsPerBatch(1000L);
(or set it to x>10000L) and
apply(Window.<KV<Integer,
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
?
I think you don't need to use a window function because your input is splitted
in micro-batches by Spark and things like discardingFiredPanes or
withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.
(You can use a Window.of to split a batch into smaller windows.)
Nicolas
________________________________
De : [email protected] <[email protected]>
Envoyé : mardi 31 juillet 2018 07:59:29
À : [email protected]
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
Yes, I have set this configure, as
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer,
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
However, the result will show in the following.
“1158 1000
1159 0
1160 0
1161 0
1162 0
1163 0
1164 1000
1165 0
1166 0
1167 0
1168 0
1169 0
1170 0
….”
Rick
From: Nicolas Viard [mailto:[email protected]]
Sent: Monday, July 30, 2018 5:35 PM
To: [email protected]
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hello,
I think Spark has a default windowing strategy and pulls data from kafka every
X ms.
You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).
Best regards,
Nicolas
________________________________
De : [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : [email protected]<mailto:[email protected]>
Objet : A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on
Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0
max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0
m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0
m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0
m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0
m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the
following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer,
String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer,
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong
with settings of spark runner? as the issue
BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide
to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
confidential information. Please do not use or disclose it in any way and
delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
confidential information. Please do not use or disclose it in any way and
delete it if you are not the intended recipient.