Hi Nicolas, I appreciate that you’ve found some problems in my program.
Your suggestion is great and useful for me. Now, the setting of spark pipeline in my program is: ============================================================ SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(10000L); options.setBatchIntervalMillis(1000L); //options.setSparkMaster("local[*]"); options.setSparkMaster("spark://ubuntu8:7077"); … apply(Window.<KV<Integer, String>>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(1)))) // windowSize (1000L) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes() ); ============================================================ Here, I run spark runner with standalone mode (with two worker nodes) of spark, Workers (4) Worker Id Address State Cores Memory worker-ubuntu9-41086 ubuntu9:41086 ALIVE 2 (2 Used) 4.0 GB (4.0 GB Used) worker- ubuntu9-34615 ubuntu9:34615 ALIVE 2 (2 Used) 4.0 GB (4.0 GB Used) worker- ubuntu8-45286 ubuntu8:45286 ALIVE 2 (2 Used) 4.0 GB (4.0 GB Used) worker- ubuntu8-39776 ubuntu8:39776 ALIVE 2 (2 Used) 4.0 GB (4.0 GB Used) and the result is listed as: Uuid count 6132 8758 6133 4300 6134 4960 6135 8860 6136 10000 6137 9480 6138 9830 6139 10000 6140 10001 6141 10000 6142 10000 6143 10000 6144 10000 6145 10000 6146 10000 6147 10000 6148 10000 6149 10000 6150 10000 6151 10000 The result shows that 1. The total amount of data in the window is gradually stable to 10,000. 2. There is a phenomenon of data delay. On the kafka broker, the issue of broker was finished: 10000000 records sent, 9999.920001 records/sec (0.95 MB/sec), 0.19 ms avg latency, 144.00 ms max latency, 0 ms 50th, 1 ms 95th, 1 ms 99th, 1 ms 99.9th. And, my program over the spark runner is continuously processing the count of data. If I would like to solve the phenomenon of data delay, do u have any suggestion? Maybe I need to increase the number of worker nodes or revise the setting of spark default.conf? Rick From: Nicolas Viard [mailto:nicolas.vi...@predict.fr] Sent: Tuesday, July 31, 2018 4:55 PM To: user@beam.apache.org Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner Sorry, I didn't see your group function. In my previous code, I used .apply( Window.<Data>into(new GlobalWindows()) .triggering(Repeatedly .forever(AfterProcessingTime .pastFirstElementInPane() .plusDelayOf(windowSize) ) ) .withAllowedLateness(Duration.ZERO).discardingFiredPanes() ) but I don't remember why I had to add a delay of windowSize (1000L here). Nicolas ________________________________ De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> <linr...@itri.org.tw<mailto:linr...@itri.org.tw>> Envoyé : mardi 31 juillet 2018 10:24:04 À : user@beam.apache.org<mailto:user@beam.apache.org> Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner Dear Nicolas, I think that we need the following code: “apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());” to split data into fixed Windows for a streaming data source (kafka). IF we do not use the window function, then the error will occur as: Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. Rick From: Nicolas Viard [mailto:nicolas.vi...@predict.fr] Sent: Tuesday, July 31, 2018 3:52 PM To: user@beam.apache.org<mailto:user@beam.apache.org> Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner Hi Rick, Can you try to remove options.setMaxRecordsPerBatch(1000L); (or set it to x>10000L) and apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); ? I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour. (You can use a Window.of to split a batch into smaller windows.) Nicolas ________________________________ De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> <linr...@itri.org.tw<mailto:linr...@itri.org.tw>> Envoyé : mardi 31 juillet 2018 07:59:29 À : user@beam.apache.org<mailto:user@beam.apache.org> Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner Dear Nicolas, Yes, I have set this configure, as Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setBatchIntervalMillis(1000L); options.setSparkMaster("local[*]"); … PCollection<KV<Integer, String>> readData1 = readData. apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); However, the result will show in the following. “1158 1000 1159 0 1160 0 1161 0 1162 0 1163 0 1164 1000 1165 0 1166 0 1167 0 1168 0 1169 0 1170 0 ….” Rick From: Nicolas Viard [mailto:nicolas.vi...@predict.fr] Sent: Monday, July 30, 2018 5:35 PM To: user@beam.apache.org<mailto:user@beam.apache.org> Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner Hello, I think Spark has a default windowing strategy and pulls data from kafka every X ms. You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000). Best regards, Nicolas ________________________________ De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> <linr...@itri.org.tw<mailto:linr...@itri.org.tw>> Envoyé : lundi 30 juillet 2018 10:58:26 À : user@beam.apache.org<mailto:user@beam.apache.org> Objet : A windows/trigger Question with kafkaIO over Spark Runner Dear all I have a question about the use of windows/triggers. The following versions of related tools are set in my running program: ================================== Beam 2.4.0 (Direct runner and Spark runner) Spark 2.3.1 (local mode) Kafka: 2.11-0.10.1.1 scala: 2.11.8 java: 1.8 ================================== My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO The configuration setting of Kafka broker is: ================================== /kafka_broker/bin/kafka-producer-perf-test.sh \ --num-records 10000000 \ --record-size 100 \ --topic kafkasink \ --throughput 10000 \ --producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000 ================================== The display of Kafka broker on console is as: ================================== ... 49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency. 50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency. 50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. 50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency. 50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. ... ================================== We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java: ================================== … SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("local[*]"); PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) //.withMaxNumRecords(500000) .withoutMetadata()); PCollection<KV<Integer, String>> readData1 = readData. apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); … ================================== The processed data will be imported into PostgresSQL. The display of results in DB is shown as follows. 224 3000 225 0 226 3000 227 0 228 0 236 0 237 0 238 5000 Unfortunately, results that we are looking forward to is: 224 9000 225 11000 226 9505 227 9829 228 10001 I do not know how to deal with this situation that maybe is about data latency? 1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632> If any further information is needed, I am glad to be informed and will provide to you as soon as possible. I will highly appreciate it if you can help me to overcome this. I am looking forward to hearing from you. Sincerely yours, Rick -- 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient. -- 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient. -- 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient. -- 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.