RE: kafkaIO Consumer Rebalance with Spark Runner

2019-01-30 Thread linrick
Dear Alexey,

I have tried to use the following settings:

Map map = ImmutableMap.builder()
.put("topic", (Object)"kafkasink2")
.put("group.id", (Object)"test-consumer-group")
.put("partition.assignment.strategy", 
(Object)"org.apache.kafka.clients.consumer.RangeAssignor")
.put("enable.auto.commit", (Object)"true")
.put("auto.offset.reset", (Object)"earliest")
.put("max.poll.records", (Object)"10")
.build();
PCollection> readKafkaData = p.apply(KafkaIO.read()
.withBootstrapServers(KafkaProperties.KAFKA_SERVER_URL)
.withTopic("kafkasink2")
.updateConsumerProperties(map)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
);
The experimental results are shown as the following figure:

[cid:image003.jpg@01D4B8B5.E5526280]

From the figure, it doesn't work for my project.

When we use the suggestion from @Juan Carlos Garcia
“You can limit your Spark processing by passing the following option to your 
beam pipeline: MaxRecordsPerBatch”

That does work for us.

Thanks for your idea.

Rick

From: Alexey Romanenko [mailto:aromanenko@gmail.com]
Sent: Wednesday, January 30, 2019 2:10 AM
To: user@beam.apache.org
Subject: Re: kafkaIO Consumer Rebalance with Spark Runner

Rick,

I think “spark.streaming.kafka.maxRatePerPartition” won’t work for you since, 
afaik, it’s a configuration option of Spark Kafka reader and Beam KafkaIO 
doesn’t use it (since it has own consumer implementation).
In the same time, if you want to set an option for Beam KafkaIO consumer config 
then you should use "updateConsumerProperties()” method.


On 28 Jan 2019, at 10:56, mailto:linr...@itri.org.tw>> 
mailto:linr...@itri.org.tw>> wrote:

Dear Raghu,

I add the line: “PCollection reshuffled = 
windowKV.apply(Reshuffle.viaRandomKey());” in my program.

I tried to control the streaming data size: 100,000/1sec to decrease the 
processing time.

The following settings are used for my project.

1.  One topic / 2 partitions

2.  Two workers / two executors

3.  The spark-default setting is:
spark.executor.instances=2
spark.executor.cores=4
spark.executor.memory=2048m
spark.default.parallelism=200

spark.streaming.blockInterval=50ms
spark.streaming.kafka.maxRatePerPartition=50,000
spark.streaming.backpressure.enabled=true
spark.streaming.concurrentJobs = 1
spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
spark.executor.extraJavaOptions=-Xss100M

spark.shuffle.consolidateFiles=true
spark.streaming.unpersist=true
spark.streaming.stopGracefullyOnShutdown=true

I hope that the data size is controlled at 100,000.

Here,


The data size is always over 100,000. The setting of 
“spark.streaming.kafka.maxRatePerPartition” confused me.

That does not seem to work for me.

Rick

From: Raghu Angadi [mailto:ang...@gmail.com]
Sent: Saturday, January 26, 2019 3:06 AM
To: user@beam.apache.org
Subject: Re: kafkaIO Consumer Rebalance with Spark Runner

You have 32 partitions. Reading can not be distributed to more than 32 parallel 
tasks.
If you have a log of processing for each record after reading, you can 
reshuffle the messages before processing them, that way the processing could be 
distributed to more tasks. Search for previous threads about reshuffle in Beam 
lists.

On Thu, Jan 24, 2019 at 7:23 PM 
mailto:linr...@itri.org.tw>> wrote:
Dear all,

I am using the kafkaIO sdk in my project (Beam with Spark runner).

The problem about task skew is shown as the following figure.


My running environment is:
OS: Ubuntn 14.04.4 LTS
The version of related tools is:
java version: "1.8.0_151"
Beam version: 2.9.0 (Spark runner with Standalone mode)
Spark version: 2.3.1 Standalone mode
  Execution condition:
  Master/Driver node: ubuntu7
  Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
The number of executors is 8

Kafka Broker: 2.10-0.10.1.1
  Broker node at ubuntu7
Kafka Client:
The topic: kafkasink32
kafkasink32 Partitions: 32

The programming of my project for kafkaIO SDK is as:
==
Map map = ImmutableMap.builder()
   .put("group.id", (Object)"test-consumer-group")
   .build();
List topicPartitions = new ArrayList();
   for(int i = 0; i < 32; i++) {
 topicPartitions.add(new TopicPartition("kafkasink32",i));
}
PCollection> readKafkaData = p.apply(KafkaIO.read()
 .withBootstrapServers("ubuntu7:9092")
   .updateConsumerProperties(map)
   .withKeyDeserializer(LongDeserializer.class)
   .withValueDeserializer(StringDeserializer.class)
   .withTopicPartitions(topicPartitions)
   .withoutMetadata()
   );
==
Here I have two directions to solve this problem:


1.  Using the following sdk from spark 

RE: kafkaIO Consumer Rebalance with Spark Runner

2019-01-28 Thread linrick
Dear Raghu,

I add the line: “PCollection reshuffled = 
windowKV.apply(Reshuffle.viaRandomKey());” in my program.

I tried to control the streaming data size: 100,000/1sec to decrease the 
processing time.

The following settings are used for my project.


1.  One topic / 2 partitions
[cid:image004.jpg@01D4B732.D640B640]

2.  Two workers / two executors


3.  The spark-default setting is:
spark.executor.instances=2
spark.executor.cores=4
spark.executor.memory=2048m
spark.default.parallelism=200

spark.streaming.blockInterval=50ms
spark.streaming.kafka.maxRatePerPartition=50,000
spark.streaming.backpressure.enabled=true
spark.streaming.concurrentJobs = 1
spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
spark.executor.extraJavaOptions=-Xss100M

spark.shuffle.consolidateFiles=true
spark.streaming.unpersist=true
spark.streaming.stopGracefullyOnShutdown=true

I hope that the data size is controlled at 100,000.

Here,
[cid:image005.jpg@01D4B732.D640B640]

The data size is always over 100,000. The setting of 
“spark.streaming.kafka.maxRatePerPartition” confused me.

That does not seem to work for me.

Rick

From: Raghu Angadi [mailto:ang...@gmail.com]
Sent: Saturday, January 26, 2019 3:06 AM
To: user@beam.apache.org
Subject: Re: kafkaIO Consumer Rebalance with Spark Runner

You have 32 partitions. Reading can not be distributed to more than 32 parallel 
tasks.
If you have a log of processing for each record after reading, you can 
reshuffle the messages before processing them, that way the processing could be 
distributed to more tasks. Search for previous threads about reshuffle in Beam 
lists.

On Thu, Jan 24, 2019 at 7:23 PM 
mailto:linr...@itri.org.tw>> wrote:
Dear all,

I am using the kafkaIO sdk in my project (Beam with Spark runner).

The problem about task skew is shown as the following figure.
[cid:image001.jpg@01D4B731.8E8469B0]

My running environment is:
OS: Ubuntn 14.04.4 LTS
The version of related tools is:
java version: "1.8.0_151"
Beam version: 2.9.0 (Spark runner with Standalone mode)
Spark version: 2.3.1 Standalone mode
  Execution condition:
  Master/Driver node: ubuntu7
  Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
The number of executors is 8

Kafka Broker: 2.10-0.10.1.1
  Broker node at ubuntu7
Kafka Client:
The topic: kafkasink32
kafkasink32 Partitions: 32

The programming of my project for kafkaIO SDK is as:
==
Map map = ImmutableMap.builder()
   .put("group.id", (Object)"test-consumer-group")
   .build();
List topicPartitions = new ArrayList();
   for(int i = 0; i < 32; i++) {
 topicPartitions.add(new TopicPartition("kafkasink32",i));
}
PCollection> readKafkaData = p.apply(KafkaIO.read()
 .withBootstrapServers("ubuntu7:9092")
   .updateConsumerProperties(map)
   .withKeyDeserializer(LongDeserializer.class)
   .withValueDeserializer(StringDeserializer.class)
   .withTopicPartitions(topicPartitions)
   .withoutMetadata()
   );
==
Here I have two directions to solve this problem:


1.  Using the following sdk from spark streaming
https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
LocationStrategies.PreferConsistent: Use in most cases as it consistently 
distributes partitions across all executors.

If we would like to use this feature, we have not idea to set this in kafkaIO 
SDK.


2.  Setting the related configurations of kafka to perform the consumer 
rebalance

set consumer group? Set group.id?


If we need to do No2., could someone give me some ideas to set configurations?


If anyone provides any direction to help us to overcome this problem, we would 
appreciate it.


Thanks.

Sincerely yours,

Rick



--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


kafkaIO Consumer Rebalance with Spark Runner

2019-01-24 Thread linrick
Dear all,

I am using the kafkaIO sdk in my project (Beam with Spark runner).

The problem about task skew is shown as the following figure.
[cid:image002.jpg@01D4B4A0.6017B340]

My running environment is:
OS: Ubuntn 14.04.4 LTS
The version of related tools is:
java version: "1.8.0_151"
Beam version: 2.9.0 (Spark runner with Standalone mode)
Spark version: 2.3.1 Standalone mode
  Execution condition:
  Master/Driver node: ubuntu7
  Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
The number of executors is 8

Kafka Broker: 2.10-0.10.1.1
  Broker node at ubuntu7
Kafka Client:
The topic: kafkasink32
kafkasink32 Partitions: 32

The programming of my project for kafkaIO SDK is as:
==
Map map = ImmutableMap.builder()
   .put("group.id", (Object)"test-consumer-group")
   .build();
List topicPartitions = new ArrayList();
   for(int i = 0; i < 32; i++) {
 topicPartitions.add(new TopicPartition("kafkasink32",i));
}
PCollection> readKafkaData = p.apply(KafkaIO.read()
 .withBootstrapServers("ubuntu7:9092")
   .updateConsumerProperties(map)
   .withKeyDeserializer(LongDeserializer.class)
   .withValueDeserializer(StringDeserializer.class)
   .withTopicPartitions(topicPartitions)
   .withoutMetadata()
   );
==
Here I have two directions to solve this problem:


1.  Using the following sdk from spark streaming
https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
LocationStrategies.PreferConsistent: Use in most cases as it consistently 
distributes partitions across all executors.

If we would like to use this feature, we have not idea to set this in kafkaIO 
SDK.


2.  Setting the related configurations of kafka to perform the consumer 
rebalance

set consumer group? Set group.id?


If we need to do No2., could someone give me some ideas to set configurations?


If anyone provides any direction to help us to overcome this problem, we would 
appreciate it.


Thanks.

Sincerely yours,

Rick



--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


RE: A windows/trigger Question with kafkaIO over Spark Runner

2018-07-31 Thread linrick
 are Spark's default (/only?) behaviour.

(You can use a Window.of to split a batch into smaller windows.)



Nicolas



De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> 
mailto:linr...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<mailto:user@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner



Dear Nicolas,



Yes, I have set this configure, as



Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setBatchIntervalMillis(1000L);

options.setSparkMaster("local[*]");

…

PCollection> readData1 = readData.

apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))

  .triggering(AfterWatermark.pastEndOfWindow()


.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

  .withAllowedLateness(Duration.ZERO)

  .discardingFiredPanes());



However, the result will show in the following.

“1158  1000

11590

11600

11610

11620

11630

11641000

11650

11660

11670

11680

11690

11700

….”



Rick



From: Nicolas Viard [mailto:nicolas.vi...@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<mailto:user@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hello,

I think Spark has a default windowing strategy and pulls data from kafka every 
X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas



De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> 
mailto:linr...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<mailto:user@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner



Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

======

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 1000 \

--record-size 100 \

--topic kafkasink \

--throughput 1 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==



The display of Kafka broker on console is as:

==

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

...

==



We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeline.java:

==

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection> readData = p.apply(KafkaIO.read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(50)

  .withoutMetadata());



PCollection> readData1 = readData.

  apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))

.triggering(AfterWatermark.pastEndOfWindow()

  
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

.withAllowedLateness(Duration.ZERO)

.discardingFiredPanes());

…

==

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.In addition, I am not sure if this issue is about kafkaIO or I was wrong 
with settings of spark runner? as the issue 
BEAM-4

RE: A windows/trigger Question with kafkaIO over Spark Runner

2018-07-31 Thread linrick
Dear Nicolas,

I think that we need the following code:
“apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))
  .triggering(AfterWatermark.pastEndOfWindow()

.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
  .withAllowedLateness(Duration.ZERO)
  .discardingFiredPanes());”

to split data into fixed Windows for a streaming data source (kafka).

IF we do not use the window function, then the error will occur as:

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot 
be applied to non-bounded PCollection in the GlobalWindow without a trigger. 
Use a Window.into or Window.triggering transform prior to GroupByKey.

Rick


From: Nicolas Viard [mailto:nicolas.vi...@predict.fr]
Sent: Tuesday, July 31, 2018 3:52 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner


Hi Rick,

Can you try to remove

options.setMaxRecordsPerBatch(1000L);
(or set it to x>1L) and
  apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))

  .triggering(AfterWatermark.pastEndOfWindow()


.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

  .withAllowedLateness(Duration.ZERO)

  .discardingFiredPanes());
?

I think you don't need to use a window function because your input is splitted 
in micro-batches by Spark and things like discardingFiredPanes or 
withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.

(You can use a Window.of to split a batch into smaller windows.)



Nicolas


De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> 
mailto:linr...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<mailto:user@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner


Dear Nicolas,



Yes, I have set this configure, as



Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setBatchIntervalMillis(1000L);

options.setSparkMaster("local[*]");

…

PCollection> readData1 = readData.

apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))

  .triggering(AfterWatermark.pastEndOfWindow()


.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

  .withAllowedLateness(Duration.ZERO)

  .discardingFiredPanes());



However, the result will show in the following.

“1158  1000

11590

11600

11610

11620

11630

11641000

11650

11660

11670

11680

11690

11700

….”



Rick



From: Nicolas Viard [mailto:nicolas.vi...@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<mailto:user@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hello,

I think Spark has a default windowing strategy and pulls data from kafka every 
X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas



De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> 
mailto:linr...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<mailto:user@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner



Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 1000 \

--record-size 100 \

--topic kafkasink \

--throughput 1 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==



The display of Kafka broker on console is as:

==

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

...

==



We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeli

RE: A windows/trigger Question with kafkaIO over Spark Runner

2018-07-30 Thread linrick
Dear Nicolas,

Yes, I have set this configure, as

Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection> readData1 = readData.
apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))
  .triggering(AfterWatermark.pastEndOfWindow()

.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
  .withAllowedLateness(Duration.ZERO)
  .discardingFiredPanes());

However, the result will show in the following.
“1158  1000
11590
11600
11610
11620
11630
11641000
11650
11660
11670
11680
11690
11700
….”

Rick

From: Nicolas Viard [mailto:nicolas.vi...@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner


Hello,

I think Spark has a default windowing strategy and pulls data from kafka every 
X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas


De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> 
mailto:linr...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<mailto:user@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner


Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 1000 \

--record-size 100 \

--topic kafkasink \

--throughput 1 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==



The display of Kafka broker on console is as:

==

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

...

==



We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeline.java:

==

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection> readData = p.apply(KafkaIO.read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(50)

  .withoutMetadata());



PCollection> readData1 = readData.

  apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))

.triggering(AfterWatermark.pastEndOfWindow()

  
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

.withAllowedLateness(Duration.ZERO)

.discardingFiredPanes());

…

==

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.In addition, I am not sure if this issue is about kafkaIO or I was wrong 
with settings of spark runner? as the issue 
BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you a

A windows/trigger Question with kafkaIO over Spark Runner

2018-07-30 Thread linrick
Dear all

I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 1000 \
--record-size 100 \
--topic kafkasink \
--throughput 1 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==

The display of Kafka broker on console is as:
==
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.
...
==

We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeline.java:
==
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");

PCollection> readData = p.apply(KafkaIO.read()
  .withBootstrapServers("ubuntu7:9092")
  .withTopic("kafkasink")
  .withKeyDeserializer(IntegerDeserializer.class)
  .withValueDeserializer(StringDeserializer.class)
  //.withMaxNumRecords(50)
  .withoutMetadata());

PCollection> readData1 = readData.
  apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
  
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224  3000
225  0
226  3000
227  0
228  0
236  0
237  0
238  5000

Unfortunately, results that we are looking forward to is:
224  9000
225  11000
226  9505
227  9829
228  10001

I do not know how to deal with this situation that maybe is about data latency?

1.In addition, I am not sure if this issue is about kafkaIO or I was wrong 
with settings of spark runner? as the issue 
BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>

If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.

I will highly appreciate it if you can help me to overcome this.

I am looking forward to hearing from you.

Sincerely yours,

Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

2018-06-20 Thread linrick
Hi,

I insert one a text in my code as follows:
…
.apply(Window.in.of(Duration.standardSeconds(1)))
  .triggering(AfterWatermark.pastEndOfWindow()

.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
  .withAllowedLateness(Duration.standardSeconds(2))
  .discardingFiredPanes())

.apply(Count.perElement());

//"JdbcData to DB"
.apply(JdbcIO.write()
  .withDataSourceConfiguration
…

Rick

From: linr...@itri.org.tw [mailto:linr...@itri.org.tw]
Sent: Thursday, June 21, 2018 10:42 AM
To: user@beam.apache.org
Subject: RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

Dear all,

The related version in my running env:
Beam 2.4.0 (Spark runner with Standalone mode)
Spark 2.0.0
Kafka: 2.11-0.10.1.1
, and the setting of Pom.xml as mentioned earlier.

When we run the following code:

args=new String[]{"--runner=SparkRunner","--sparkMaster=spark://ubuntu8:7077"};
SparkPipelineOptions 
options=PipelineOptionsFactory.fromArgs(args).withValidation().as(SparkPipelineOptions.class);

PCollection readData = p

//"Use kafkaIO to inject data"
.apply(KafkaIO.read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withMaxNumRecords(10)  // If I set this parameter, the code works.;If I do 
not set this parameter, the executor of Spark will be closed.
.withoutMetadata())
.apply(Values.create())

//"FixedWindows to collect data"
.apply(Window.in.of(Duration.standardSeconds(1)))
  .triggering(AfterWatermark.pastEndOfWindow()

.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
  .withAllowedLateness(Duration.standardSeconds(2))
  .discardingFiredPanes())
//"JdbcData to DB"
.apply(JdbcIO.write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
  "org.postgresql.Driver",
  "jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7")
  .withUsername("postgres")
  .withPassword("postgres"))
 .withStatement("insert into kafkabeamdata (count) values(?)")
 .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter() {
 @Override
 public void setParameters(Long element, PreparedStatement query)
  throws SQLException {
double count = element.doubleValue();
query.setDouble(1, count);}
}));

p.run();

However, this project need to continuously inject data from kafka and input 
result into PostgreSQL DB every one second.
If we set parameter “withMaxNumRecords(10)”, the code works and input 
result once until the collected data of number is 10.

In addition, if I run this code under Beam (Spark runner with Local[4]), it is 
successful (continuously inject data).

Therefore, we are not sure if this may be a bug about kafkaIO?

If any further information is needed, we are glad to be informed and will 
provide to you as soon as possible.

We are looking forward to hearing from you.

Thanks very much for your attention to our problem.

Sincerely yours,

Rick


From: linr...@itri.org.tw 
[mailto:linr...@itri.org.tw]
Sent: Thursday, June 14, 2018 2:48 PM
To: user@beam.apache.org
Subject: RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"


Hi all,



I have changed these versions of tools, as:



Beam 2.4.0 (Spark runner with Standalone mode)

Spark 2.0.0

Kafka: 2.11-0.10.1.1
Pom.xml:

  org.apache.beam
  beam-sdks-java-core
  2.4.0



   org.apache.beam
   beam-runners-spark
   2.4.0



org.apache.beam
  beam-sdks-java-io-kafka
  2.4.0



org.apache.spark
  spark-core_2.11
2.0.0



  org.apache.spark
  spark-streaming_2.11

  2.0.0




org.apache.kafka
 kafka-clients
 0.10.1.1





Here, there is new error:



ubuntu9 worker log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/06/14 14:36:27 INFO CoarseGrainedExecutorBackend: Started daemon with 
process name: 22990@ubuntu9
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for TERM
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for HUP
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for INT
18/06/14 14:36:28 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
18/06/14 14:36:28 INFO SecurityManager: Changing view acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing view acls groups to:
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls groups to:

…

18/06/14 14:36:28 INFO DiskBlockManager: Created local directory at 
/tmp/spark-e035a190-2ab4-4167-b0c1-7868bac7afc1/executor-083a9db8-994a-4860-b2fa-d5f490bac01d/blockmgr-568096a9-2060-4717-a3e4-99d4abcee7bd

18/06/14 14:36:28 

RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

2018-06-20 Thread linrick
Dear all,

The related version in my running env:
Beam 2.4.0 (Spark runner with Standalone mode)
Spark 2.0.0
Kafka: 2.11-0.10.1.1
, and the setting of Pom.xml as mentioned earlier.

When we run the following code:

args=new String[]{"--runner=SparkRunner","--sparkMaster=spark://ubuntu8:7077"};
SparkPipelineOptions 
options=PipelineOptionsFactory.fromArgs(args).withValidation().as(SparkPipelineOptions.class);

PCollection readData = p

//"Use kafkaIO to inject data"
.apply(KafkaIO.read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withMaxNumRecords(10)  // If I set this parameter, the code works.;If I do 
not set this parameter, the executor of Spark will be closed.
.withoutMetadata())
.apply(Values.create())

//"FixedWindows to collect data"
.apply(Window.in.of(Duration.standardSeconds(1)))
  .triggering(AfterWatermark.pastEndOfWindow()

.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
  .withAllowedLateness(Duration.standardSeconds(2))
  .discardingFiredPanes())
//"JdbcData to DB"
.apply(JdbcIO.write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
  "org.postgresql.Driver",
  "jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7")
  .withUsername("postgres")
  .withPassword("postgres"))
 .withStatement("insert into kafkabeamdata (count) values(?)")
 .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter() {
 @Override
 public void setParameters(Long element, PreparedStatement query)
  throws SQLException {
double count = element.doubleValue();
query.setDouble(1, count);}
}));

p.run();

However, this project need to continuously inject data from kafka and input 
result into PostgreSQL DB every one second.
If we set parameter “withMaxNumRecords(10)”, the code works and input 
result once until the collected data of number is 10.

In addition, if I run this code under Beam (Spark runner with Local[4]), it is 
successful (continuously inject data).

Therefore, we are not sure if this may be a bug about kafkaIO?

If any further information is needed, we are glad to be informed and will 
provide to you as soon as possible.

We are looking forward to hearing from you.

Thanks very much for your attention to our problem.

Sincerely yours,

Rick


From: linr...@itri.org.tw [mailto:linr...@itri.org.tw]
Sent: Thursday, June 14, 2018 2:48 PM
To: user@beam.apache.org
Subject: RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"


Hi all,



I have changed these versions of tools, as:



Beam 2.4.0 (Spark runner with Standalone mode)

Spark 2.0.0

Kafka: 2.11-0.10.1.1
Pom.xml:

  org.apache.beam
  beam-sdks-java-core
  2.4.0



   org.apache.beam
   beam-runners-spark
   2.4.0



org.apache.beam
  beam-sdks-java-io-kafka
  2.4.0



org.apache.spark
  spark-core_2.11
2.0.0



  org.apache.spark
  spark-streaming_2.11

  2.0.0




org.apache.kafka
 kafka-clients
 0.10.1.1





Here, there is new error:



ubuntu9 worker log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/06/14 14:36:27 INFO CoarseGrainedExecutorBackend: Started daemon with 
process name: 22990@ubuntu9
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for TERM
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for HUP
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for INT
18/06/14 14:36:28 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
18/06/14 14:36:28 INFO SecurityManager: Changing view acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing view acls groups to:
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls groups to:

…

18/06/14 14:36:28 INFO DiskBlockManager: Created local directory at 
/tmp/spark-e035a190-2ab4-4167-b0c1-7868bac7afc1/executor-083a9db8-994a-4860-b2fa-d5f490bac01d/blockmgr-568096a9-2060-4717-a3e4-99d4abcee7bd

18/06/14 14:36:28 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:28 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

B





Ubuntu8 worker log:

18/06/14 14:36:30 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:30 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
spark://CoarseGrainedScheduler@ubuntu7:43572

18/06/14 14:36:30 INFO WorkerWatcher: Connecting to worker 
spark://Worker@ubuntu8:39499

18/06/14 14:36:30 ERROR CoarseGrainedExecutorBackend: Cannot register with 
driver: spark://



I have no idea about the error.



In addition, related configures of spark are:



Master node:

Spark-env.sh

export SPARK_MASTER_HOST="xx.xxx.x.x"

export 

RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

2018-06-14 Thread linrick
Hi all,



I have changed these versions of tools, as:



Beam 2.4.0 (Spark runner with Standalone mode)

Spark 2.0.0

Kafka: 2.11-0.10.1.1
Pom.xml:

  org.apache.beam
  beam-sdks-java-core
  2.4.0



   org.apache.beam
   beam-runners-spark
   2.4.0



org.apache.beam
  beam-sdks-java-io-kafka
  2.4.0



org.apache.spark
  spark-core_2.11
2.0.0



  org.apache.spark
  spark-streaming_2.11

  2.0.0




org.apache.kafka
 kafka-clients
 0.10.1.1





Here, there is new error:



ubuntu9 worker log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/06/14 14:36:27 INFO CoarseGrainedExecutorBackend: Started daemon with 
process name: 22990@ubuntu9
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for TERM
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for HUP
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for INT
18/06/14 14:36:28 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
18/06/14 14:36:28 INFO SecurityManager: Changing view acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing view acls groups to:
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls groups to:

…

18/06/14 14:36:28 INFO DiskBlockManager: Created local directory at 
/tmp/spark-e035a190-2ab4-4167-b0c1-7868bac7afc1/executor-083a9db8-994a-4860-b2fa-d5f490bac01d/blockmgr-568096a9-2060-4717-a3e4-99d4abcee7bd

18/06/14 14:36:28 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:28 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

B





Ubuntu8 worker log:

18/06/14 14:36:30 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:30 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
spark://CoarseGrainedScheduler@ubuntu7:43572

18/06/14 14:36:30 INFO WorkerWatcher: Connecting to worker 
spark://Worker@ubuntu8:39499

18/06/14 14:36:30 ERROR CoarseGrainedExecutorBackend: Cannot register with 
driver: spark://



I have no idea about the error.



In addition, related configures of spark are:



Master node:

Spark-env.sh

export SPARK_MASTER_HOST="xx.xxx.x.x"

export SPARK_MASTER_WEBUI_PORT=1234



Spark-default.conf

spark.driver.memory  10g

spark.executor.memory2g

spark.executor.instances   4



Worker node:

Spark-env.sh

export SPARK_MASTER_HOST="xx.xxx.x.x"

export SPARK_MASTER_WEBUI_PORT=1234



Spark-default.conf

spark.driver.memory  10g

spark.executor.memory2g

spark.executor.instances   4



Rick



-Original Message-
From: Ismaël Mejía [mailto:ieme...@gmail.com]
Sent: Wednesday, June 13, 2018 11:35 PM
To: user@beam.apache.org
Subject: Re: kafkaIO Run with Spark Runner: "streaming-job-executor-0"



Can you please update the version of Beam to at least version 2.2.0.

There were some important fixes in streaming after the 2.0.0 release so this 
could be related. Ideally you should use the latest released version (2.4.0). 
Remember that starting with Beam 2.3.0 the Spark runner is based on Spark 2.



On Wed, Jun 13, 2018 at 5:11 PM Raghu Angadi 
mailto:rang...@google.com>> wrote:

>

> Can you check the logs on the worker?

>

> On Wed, Jun 13, 2018 at 2:26 AM 
> mailto:linr...@itri.org.tw>> wrote:

>>

>> Dear all,

>>

>>

>>

>> I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).

>>

>> My running environment is:

>>

>> OS: Ubuntn 14.04.3 LTS

>>

>> The different version for these tools:

>>

>> JAVA: JDK 1.8

>>

>> Beam 2.0.0 (Spark runner with Standalone mode)

>>

>> Spark 1.6.0

>>

>> Standalone mode :One driver node: ubuntu7; One master node: ubuntu8;

>> Two worker nodes: ubuntu8 and ubuntu9

>>

>> Kafka: 2.10-0.10.1.1

>>

>>

>>

>> The java code of my project is:

>>

>> =

>> =

>>

>> SparkPipelineOptions options =

>> PipelineOptionsFactory.as(SparkPipelineOptions.class);

>>

>> options.setRunner(SparkRunner.class);

>>

>> options.setSparkMaster("spark://ubuntu8:7077");

>>

>> options.setAppName("App kafkaBeamTest");

>>

>> options.setJobName("Job kafkaBeamTest");

>>

>> options.setMaxRecordsPerBatch(1000L);

>>

>>

>>

>> Pipeline p = Pipeline.create(options);

>>

>>

>>

>> System.out.println("Beamtokafka");

>>

>> PCollection> readData = p.apply(KafkaIO.> String>read()

>>

>> .withBootstrapServers(ubuntu7:9092)

>>

>> .withTopic("kafkasink")

>>

>> .withKeyDeserializer(LongDeserializer.class)

>>

>> .withValueDeserializer(StringDeserializer.class)

>>

>>.withoutMetadata()

>>

>>);

>>

>>

>>

>> PCollection> readDivideData = readData.

>>

>> apply(Window.>into(FixedWindows.of(Duration.standardS

>> econds(1)))

>>

>>  .triggering(AfterWatermark.pastEndOfWindow()

>>

>>

>> 

kafkaIO Run with Spark Runner: "streaming-job-executor-0"

2018-06-13 Thread linrick
Dear all,

I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).
My running environment is:
OS: Ubuntn 14.04.3 LTS
The different version for these tools:
JAVA: JDK 1.8
Beam 2.0.0 (Spark runner with Standalone mode)
Spark 1.6.0
Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two worker 
nodes: ubuntu8 and ubuntu9
Kafka: 2.10-0.10.1.1

The java code of my project is:
==
SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("spark://ubuntu8:7077");
options.setAppName("App kafkaBeamTest");
options.setJobName("Job kafkaBeamTest");
options.setMaxRecordsPerBatch(1000L);

Pipeline p = Pipeline.create(options);

System.out.println("Beamtokafka");
PCollection> readData = p.apply(KafkaIO.read()
.withBootstrapServers(ubuntu7:9092)
.withTopic("kafkasink")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
   .withoutMetadata()
   );

PCollection> readDivideData = readData.
apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))
 .triggering(AfterWatermark.pastEndOfWindow()
   
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
 .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());

System.out.println("CountData");

PCollection> countData = readDivideData.apply(Count.perKey());

p.run();
==

The message of error is:
==
Exception in thread "streaming-job-executor-0" java.lang.Error: 
java.lang.InterruptedException
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
…
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
==

Maven 3.5.0, in which related dependencies are listed in my project’s pom.xml:

org.apache.beam
  beam-sdks-java-core
  2.0.0


org.apache.beam
   beam-sdks-java-io-kafka
   2.0.0


org.apache.spark
  spark-core_2.10
  1.6.0


org.apache.spark
  spark-streaming_2.10
  1.6.0



org.apache.kafka
  kafka-clients
  0.10.1.1


org.apache.kafka
  kafka_2.10
  0.10.1.1



When I use the above code in Spark Runner (Local [4]), this project worked well 
(2000~4000 data/s). However, if I run it on Standalone mode, it failed along 
with the above error.

If you have any idea about the error ("streaming-job-executor-0"), I am looking 
forward to hearing from you.

Note that: perform command line is “./spark-submit --class 
com.itri.beam.kafkatest --master spark:// ubuntu8:7077 
/root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”

Thanks

Rick




--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


RE: The problem of kafkaIO sdk for data latency

2018-03-04 Thread linrick
Hi Raghu,

I changed my beam version from 2.0.0 to 2.3.0, and then work well.

Thanks for your help.

Now, I have another question about data size in my window.

I am trying to control a fixed data size in my window (i.e., fixed 100 data 
size/window).

However, I sometimes see that the number of samples will be { 10, 40, 70, 100, 
150,…}.

if anyone provide any idea  to me to set my window, I would appreciate it.

The setting for my window is as:

Window.> into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow() 
.withLateFirings(AfterPane.elementCountAtLeast(100)))
 .withAllowedLateness(Duration.ZERO)
 .discardingFiredPanes())

My ideal window is:
|window time=1s||window time=1s||window time=1s||window time=1s|
| data size=100   || data size=100   || data size=100   || data size=100   |
First trigger firing: [1, 2,…, 100]
Second trigger firing: [101, 102,…, 200]
Third trigger firing:   [201, 
202,…, 300]

Thanks

Rick


From: Raghu Angadi [mailto:rang...@google.com]
Sent: Saturday, March 03, 2018 5:52 AM
To: user 
Cc: 林良憲 
Subject: Re: The problem of kafkaIO sdk for data latency

I recently noticed that DirectRunner was leaking readers eventually crashing my 
pipeline. It is fixed in master (PR 
4658, version 2.4.0-SNAPSHOT). Can 
you try that? In my case the pipeline ran out of file descriptors.

Note that DirectRunner is not particularly optimized for runtime performance. 
It is often used for initial testing. Since the performance is alright for you 
initially, trying it out with master might help.

Note that TimestampedValue<> does not actually change the timestamp of the 
event. KafkaIO uses processing time for event time by default. Please see 
JavaDoc for KafkaIO for more options.

On Wed, Feb 28, 2018 at 6:59 PM 
> wrote:
Dear all,

I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).

With using this sdk, there are a situation about data latency, and the 
description of situation is in the following.

The data come from kafak with a fixed speed: 100 data size/ 1 sec.

I create a fixed window within 1 sec without delay. I found that the data size 
is 70, 80, 104, or greater than or equal to 104.

After one day, the data latency happens in my running time, and the data size 
will be only 10 in each window.

In order to clearly explain it, I also provide my code in the following.
" PipelineOptions readOptions = PipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(readOptions);

PCollection>> readData =
  p.apply(KafkaIO.read()
 .withBootstrapServers("127.0.0.1:9092")
 .withTopic("kafkasink")
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(StringDeserializer.class)
 .withoutMetadata())
 .apply(ParDo.of(new DoFn, TimestampedValue>>() {
@ProcessElement
public void test(ProcessContext c) throws ParseException {
String element = c.element().getValue();
try {
  JsonNode arrNode = new ObjectMapper().readTree(element);
  String t = arrNode.path("v").findValue("Timestamp").textValue();
  DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("MM/dd/ HH:mm:ss.");
 LocalDateTime dateTime = LocalDateTime.parse(t, formatter);
 java.time.Instant java_instant = 
dateTime.atZone(ZoneId.systemDefault()).toInstant();
 Instant timestamp  = new Instant(java_instant.toEpochMilli());
  c.output(TimestampedValue.of(c.element(), timestamp));
} catch (JsonGenerationException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
  } catch (IOException e) {
e.printStackTrace();
  }
}}));

PCollection>> readDivideData = 
readData.apply(
  Window.>> 
into(FixedWindows.of(Duration.standardSeconds(1))
  .withOffset(Duration.ZERO))
  .triggering(AfterWatermark.pastEndOfWindow()
 .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
   .plusDelayOf(Duration.ZERO)))
  .withAllowedLateness(Duration.ZERO)
  .discardingFiredPanes());"

In addition, the running result is as shown in the following.
"data-size=104
coming-data-time=2018-02-27 02:00:49.117
window-time=2018-02-27 02:00:49.999

data-size=70
coming-data-time=2018-02-27 02:00:50.318
window-time=2018-02-27 02:00:50.999

data-size=104
coming-data-time=2018-02-27 02:00:51.102
window-time=2018-02-27 02:00:51.999

After one 

The problem of kafkaIO sdk for data latency

2018-02-28 Thread linrick
Dear all,

I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).

With using this sdk, there are a situation about data latency, and the 
description of situation is in the following.

The data come from kafak with a fixed speed: 100 data size/ 1 sec.

I create a fixed window within 1 sec without delay. I found that the data size 
is 70, 80, 104, or greater than or equal to 104.

After one day, the data latency happens in my running time, and the data size 
will be only 10 in each window.

In order to clearly explain it, I also provide my code in the following.
" PipelineOptions readOptions = PipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(readOptions);

PCollection>> readData =
  p.apply(KafkaIO.read()
 .withBootstrapServers("127.0.0.1:9092")
 .withTopic("kafkasink")
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(StringDeserializer.class)
 .withoutMetadata())
 .apply(ParDo.of(new DoFn, TimestampedValue>>() {
@ProcessElement
public void test(ProcessContext c) throws ParseException {
String element = c.element().getValue();
try {
  JsonNode arrNode = new ObjectMapper().readTree(element);
  String t = arrNode.path("v").findValue("Timestamp").textValue();
  DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("MM/dd/ HH:mm:ss.");
 LocalDateTime dateTime = LocalDateTime.parse(t, formatter);
 java.time.Instant java_instant = 
dateTime.atZone(ZoneId.systemDefault()).toInstant();
 Instant timestamp  = new Instant(java_instant.toEpochMilli());
  c.output(TimestampedValue.of(c.element(), timestamp));
} catch (JsonGenerationException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
  } catch (IOException e) {
e.printStackTrace();
  }
}}));

PCollection>> readDivideData = 
readData.apply(
  Window.>> 
into(FixedWindows.of(Duration.standardSeconds(1))
  .withOffset(Duration.ZERO))
  .triggering(AfterWatermark.pastEndOfWindow()
 .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
   .plusDelayOf(Duration.ZERO)))
  .withAllowedLateness(Duration.ZERO)
  .discardingFiredPanes());"

In addition, the running result is as shown in the following.
"data-size=104
coming-data-time=2018-02-27 02:00:49.117
window-time=2018-02-27 02:00:49.999

data-size=70
coming-data-time=2018-02-27 02:00:50.318
window-time=2018-02-27 02:00:50.999

data-size=104
coming-data-time=2018-02-27 02:00:51.102
window-time=2018-02-27 02:00:51.999

After one day:
data-size=10
coming-data-time=2018-02-28 02:05:48.217
window-time=2018-03-01 10:35:16.999 "

For repeating my situation, my running environment is:
OS: Ubuntn 14.04.3 LTS

JAVA: JDK 1.7

Beam 2.0.0 (with Direct runner)

Kafka 2.10-0.10.1.1

Maven 3.5.0, in which dependencies are listed in pom.xml:

  org.apache.beam
  beam-sdks-java-core
  2.0.0


   org.apache.beam
  beam-runners-direct-java
  2.0.0
  runtime



org.apache.beam
   beam-sdks-java-io-kafka
   2.0.0





   org.apache.kafka
   kafka-clients
   0.10.0.1


If you have any idea about the problem (data latency), I am looking forward to 
hearing from you.

Thanks

Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


RE: Regarding Beam Slack Channel

2017-12-01 Thread linrick
Hi
Can I receive this invitation, too?

Thanks
Rick

-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
Sent: Friday, December 01, 2017 12:53 PM
To: user@beam.apache.org
Subject: Re: Regarding Beam Slack Channel

Invite sent as well.

Regards
JB

On 11/30/2017 07:19 PM, Yanael Barbier wrote:
> Hello
> Can I get an invite too?
>
> Thanks,
> Yanael
>
> Le jeu. 30 nov. 2017 à 19:15, Wesley Tanaka  > a écrit :
>
> Invite sent
>
>
> On 11/30/2017 08:11 AM, Nalseez Duke wrote:
>> Hello
>>
>> Can someone please add me to the Beam slack channel?
>>
>> Thanks.
>
>
> --
> Wesley Tanaka
> https://wtanaka.com/
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


Can we transfrom PCollection to ArrayList?

2017-10-31 Thread linrick
Hi all,

I am Rick

I would like to transform datatype from PCollection to ArrayList, and I am not 
sure if it is right?

My run env is following Java: 1.8 and Beam: 2.1.0

My java code is as:
ArrayList myList = new ArrayList();

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection data=p.apply("data",Create.of(1,2,3,4,5));
PCollection newdata=data.apply(ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c)
{
int datavalue=c.element()+1;
System.out.println("data="+datavalue);
   c.output(datavalue);
}
}));

p.run();

If any idea could be shared with me, I highly appreciate it.

Thanks

Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


RE: How to use ConsoleIO SDK

2017-10-23 Thread linrick
Hi,

If it's all right with you, I wanna try it.

Thanks

Sincerely
Rick

-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
Sent: Friday, October 20, 2017 1:32 PM
To: user@beam.apache.org
Subject: Re: How to use ConsoleIO SDK

Hi,

I started to work on the ConsoleIO (and SocketIO too), but it's not yet merged.

I can provide a SNAPSHOT to you if you wanna try.

Regards
JB

On 10/20/2017 04:14 AM, linr...@itri.org.tw wrote:
> Dear sir,
>
> I have the question how to use the beam java sdk: ConsoleIO.
>
> My objective colored in background yellow is to write the PCollection
> ”data” on Console, and then use it(type: RDD ??) as another variable to do 
> other works.
>
> If any further information is needed, I am glad to be informed and
> will provide to you as soon as possible.
>
> I am looking forward to hearing from you.
>
> My java code is as:
>
> “
>
> *import *java.io.IOException;
>
> *import*org.apache.beam.sdk.Pipeline;
>
> *import*org.apache.beam.sdk.options.PipelineOptionsFactory;
>
> *import*org.apache.beam.runners.spark.SparkRunner;
>
> *import*org.apache.beam.runners.spark.io.ConsoleIO;
>
> *import*org.apache.beam.runners.spark.SparkPipelineOptions;
>
> **
>
> *import *org.apache.beam.sdk.transforms.Create;
>
> *import *org.apache.beam.sdk.values.KV;
>
> *import *org.apache.beam.sdk.values.PCollection;
>
> *import *org.apache.beam.sdk.values.TimestampedValue;
>
> **
>
> *import *javafx.util.Pair;**
>
> **
>
> *import*org.joda.time.Duration;
>
> *import*org.joda.time.Instant;
>
> *import*org.joda.time.MutableDateTime;
>
> *public**static**void*main(String[] args) *throws*IOException  {
>
> MutableDateTime mutableNow=
> Instant./now/().toMutableDateTime();
>
> mutableNow.setDateTime(2017, 7, 12, 14, 0, 0, 0);
>
> Instant starttime= mutableNow.toInstant().plus(8*60*60*1000);
>
> *int*min;
>
> *int*sec;
>
> *int*millsec;
>
> min=2;
>
> sec=min*60;
>
> millsec=sec*1000;
>
> *double*[] value=*new**double*[] {1.0,2.0,3.0,4.0,5.0};
>
> List dataList= *new*ArrayList<>();
>
> *int*n=value.length;
>
> *int*count=0;
>
> *for*(*int*i=0; i
>  {
>
> count=count+1;
>
> *if*(i<=3)
>
> {
>
>Instant M1_time=starttime.plus(millsec*count);
>
> dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair Double> (i,value[i])), M1_time));
>
> }
>
> *else**if*(4<=i&& i<5)
>
> {
>
>Instant M2_time=starttime.plus(millsec*count);
>
> dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair Double> (i,value[i])), M2_time));
>
> }
>
> *else*
>
> {
>
>Instant M3_time=starttime.plus(millsec*count);
>
> dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair Double> (i,value[i])), M3_time));
>
> }
>
> System.*/out/*.println("raw_data="+dataList.get(i));
>
>  }
>
> SparkPipelineOptions options=
> PipelineOptionsFactory./as/(SparkPipelineOptions.*class*);
>
> options.setRunner(SparkRunner.*class*);
>
> options.setSparkMaster("local[4]");
>
> Pipeline p= Pipeline./create/(options);
>
> PCollection>> data=p.apply("create
> data with time",Create./timestamped/(dataList));
>
> data.apply("spark_write_on_console",ConsoleIO.Write._out_);
>
> p.run().waitUntilFinish();
>
> ”
>
> Thanks very much
>
> Sincerely yours,
>
> Liang-Sian Lin, Dr.
>
> Oct 20 2017
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀
> 此信件。 This email may contain confidential information. Please do not
> use or disclose it in any way and delete it if you are not the intended 
> recipient.

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


How to use ConsoleIO SDK

2017-10-19 Thread linrick
Dear sir,

I have the question how to use the beam java sdk: ConsoleIO.

My objective colored in background yellow is to write the PCollection ”data” on 
Console, and then use it(type: RDD ??) as another variable to do other works.

If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.

I am looking forward to hearing from you.

My java code is as:
“

import java.io.IOException;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.SparkPipelineOptions;

import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;

import javafx.util.Pair;

import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;

public static void main(String[] args) throws IOException  {
   MutableDateTime mutableNow = Instant.now().toMutableDateTime();
   mutableNow.setDateTime(2017, 7, 12, 14, 0, 0, 0);
   Instant starttime = mutableNow.toInstant().plus(8*60*60*1000);
   int min;
   int sec;
   int millsec;
   min=2;
   sec=min*60;
   millsec=sec*1000;
   double[] value=new double[] {1.0,2.0,3.0,4.0,5.0};
   List();
   int n=value.length;
   int count=0;
   for (int i=0; i (i,value[i])), M1_time));
   }

   else if (4<=i && i<5)
   {
  Instant M2_time=starttime.plus(millsec*count);
  dataList.add(TimestampedValue.of(KV.of("M1", new Pair (i,value[i])), M2_time));
   }

   else
   {
  Instant M3_time=starttime.plus(millsec*count);
  dataList.add(TimestampedValue.of(KV.of("M1", new Pair (i,value[i])), M3_time));
   }
   System.out.println("raw_data="+dataList.get(i));
}

   SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
   options.setRunner(SparkRunner.class);
   options.setSparkMaster("local[4]");
   Pipeline p = Pipeline.create(options);

   PCollection>> data=p.apply("create data 
with time",Create.timestamped(dataList));


data.apply("spark_write_on_console",ConsoleIO.Write.out);

   p.run().waitUntilFinish();
”

Thanks very much

Sincerely yours,
Liang-Sian Lin, Dr.
Oct 20 2017


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.