Hi Nicolas,
I appreciate that you’ve found some problems in my program.
Your suggestion is great and useful for me.
Now, the setting of spark pipeline in my program is:
============================================================
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(10000L);
options.setBatchIntervalMillis(1000L);
//options.setSparkMaster("local[*]");
options.setSparkMaster("spark://ubuntu8:7077");
…
apply(Window.<KV<Integer, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))) // windowSize (1000L)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
);
============================================================
Here, I run spark runner with standalone mode (with two worker nodes) of spark,
Workers (4)
Worker Id Address State
Cores Memory
worker-ubuntu9-41086 ubuntu9:41086 ALIVE 2 (2 Used) 4.0 GB
(4.0 GB Used)
worker- ubuntu9-34615 ubuntu9:34615 ALIVE 2 (2 Used) 4.0 GB
(4.0 GB Used)
worker- ubuntu8-45286 ubuntu8:45286 ALIVE 2 (2 Used) 4.0 GB
(4.0 GB Used)
worker- ubuntu8-39776 ubuntu8:39776 ALIVE 2 (2 Used) 4.0 GB
(4.0 GB Used)
and the result is listed as:
Uuid count
6132 8758
6133 4300
6134 4960
6135 8860
6136 10000
6137 9480
6138 9830
6139 10000
6140 10001
6141 10000
6142 10000
6143 10000
6144 10000
6145 10000
6146 10000
6147 10000
6148 10000
6149 10000
6150 10000
6151 10000
The result shows that
1. The total amount of data in the window is gradually stable to 10,000.
2. There is a phenomenon of data delay.
On the kafka broker, the issue of broker was finished: 10000000 records sent,
9999.920001 records/sec (0.95 MB/sec), 0.19 ms avg latency, 144.00 ms max
latency, 0 ms 50th, 1 ms 95th, 1 ms 99th, 1 ms 99.9th.
And, my program over the spark runner is continuously processing the count of
data.
If I would like to solve the phenomenon of data delay, do u have any suggestion?
Maybe I need to increase the number of worker nodes or revise the setting of
spark default.conf?
Rick
From: Nicolas Viard [mailto:[email protected]]
Sent: Tuesday, July 31, 2018 4:55 PM
To: [email protected]
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Sorry, I didn't see your group function.
In my previous code, I used
.apply(
Window.<Data>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(windowSize)
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
but I don't remember why I had to add a delay of windowSize (1000L here).
Nicolas
________________________________
De : [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Envoyé : mardi 31 juillet 2018 10:24:04
À : [email protected]<mailto:[email protected]>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
I think that we need the following code:
“apply(Window.<KV<Integer,
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());”
to split data into fixed Windows for a streaming data source (kafka).
IF we do not use the window function, then the error will occur as:
Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot
be applied to non-bounded PCollection in the GlobalWindow without a trigger.
Use a Window.into or Window.triggering transform prior to GroupByKey.
Rick
From: Nicolas Viard [mailto:[email protected]]
Sent: Tuesday, July 31, 2018 3:52 PM
To: [email protected]<mailto:[email protected]>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hi Rick,
Can you try to remove
options.setMaxRecordsPerBatch(1000L);
(or set it to x>10000L) and
apply(Window.<KV<Integer,
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
?
I think you don't need to use a window function because your input is splitted
in micro-batches by Spark and things like discardingFiredPanes or
withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.
(You can use a Window.of to split a batch into smaller windows.)
Nicolas
________________________________
De : [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : [email protected]<mailto:[email protected]>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
Yes, I have set this configure, as
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer,
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
However, the result will show in the following.
“1158 1000
1159 0
1160 0
1161 0
1162 0
1163 0
1164 1000
1165 0
1166 0
1167 0
1168 0
1169 0
1170 0
….”
Rick
From: Nicolas Viard [mailto:[email protected]]
Sent: Monday, July 30, 2018 5:35 PM
To: [email protected]<mailto:[email protected]>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hello,
I think Spark has a default windowing strategy and pulls data from kafka every
X ms.
You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).
Best regards,
Nicolas
________________________________
De : [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : [email protected]<mailto:[email protected]>
Objet : A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on
Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0
max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0
m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0
m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0
m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0
m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the
following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer,
String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer,
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong
with settings of spark runner? as the issue
BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide
to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
confidential information. Please do not use or disclose it in any way and
delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
confidential information. Please do not use or disclose it in any way and
delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
confidential information. Please do not use or disclose it in any way and
delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
confidential information. Please do not use or disclose it in any way and
delete it if you are not the intended recipient.