Hi Rick,

Can you try to remove

options.setMaxRecordsPerBatch(1000L);

(or set it to x>10000L) and
  apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());

?

I think you don't need to use a window function because your input is splitted 
in micro-batches by Spark and things like discardingFiredPanes or 
withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.

(You can use a Window.of to split a batch into smaller windows.)


Nicolas


________________________________
De : linr...@itri.org.tw <linr...@itri.org.tw>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner


Dear Nicolas,



Yes, I have set this configure, as



Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setBatchIntervalMillis(1000L);

options.setSparkMaster("local[*]");

…

PCollection<KV<Integer, String>> readData1 = readData.

    apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());



However, the result will show in the following.

“1158      1000

1159        0

1160        0

1161        0

1162        0

1163        0

1164        1000

1165        0

1166        0

1167        0

1168        0

1169        0

1170        0

….”



Rick



From: Nicolas Viard [mailto:nicolas.vi...@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hello,

I think Spark has a default windowing strategy and pulls data from kafka every 
X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas

________________________________

De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> 
<linr...@itri.org.tw<mailto:linr...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<mailto:user@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner



Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==================================

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 10000000 \

--record-size 100 \

--topic kafkasink \

--throughput 10000 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==================================



The display of Kafka broker on console is as:

==================================

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

...

==================================



We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeline.java:

==================================

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, 
String>read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(500000)

  .withoutMetadata());



PCollection<KV<Integer, String>> readData1 = readData.

  apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardSeconds(1)))

    .triggering(AfterWatermark.pastEndOfWindow()

      
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

    .withAllowedLateness(Duration.ZERO)

    .discardingFiredPanes());

…

==================================

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong 
with settings of spark runner? as the issue 
BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.

Reply via email to