This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f57cb9c208d [CdapIO] Add integration tests for SparkReceiverIO (#23305) f57cb9c208d is described below commit f57cb9c208d9e1ac2e353c3a2d2be2b5b3ccee4a Author: Vitaly Terentyev <vitaly.terent...@akvelon.com> AuthorDate: Wed Nov 2 20:15:50 2022 +0400 [CdapIO] Add integration tests for SparkReceiverIO (#23305) * Add integration tests for SparkReceiverIO * Refactoring. Resolve comments. * Add CustomOffsetRangeTracker. Fix reading from RabbitMqReceiver * Fix compile * Fix rabbitmq yaml file * Refactoring * Add tests for SDF Co-authored-by: Sorokin Andrey <sorokin1...@gmail.com> --- .test-infra/jenkins/README.md | 1 + .../job_PerformanceTests_SparkReceiverIO_IT.groovy | 84 +++++ .test-infra/kubernetes/rabbit/rabbitmq.yaml | 187 +++++++++++ .../Java_IO_IT_Tests_Dataflow.json | 123 +++++++ .../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 + sdks/java/io/sparkreceiver/README.md | 38 +++ sdks/java/io/sparkreceiver/build.gradle | 11 +- .../ReadFromSparkReceiverWithOffsetDoFn.java | 106 +++++- .../sparkreceiver/RabbitMqReceiverWithOffset.java | 163 ++++++++++ .../ReadFromSparkReceiverWithOffsetDoFnTest.java | 145 +++++++++ .../sdk/io/sparkreceiver/SparkReceiverIOIT.java | 354 +++++++++++++++++++++ 11 files changed, 1193 insertions(+), 20 deletions(-) diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md index e547c23f24f..e53dae86c45 100644 --- a/.test-infra/jenkins/README.md +++ b/.test-infra/jenkins/README.md @@ -155,6 +155,7 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/) | beam_PerformanceTests_PubsubIOIT_Python_Streaming | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming/), [phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming_PR/) | `Run PubsubIO Performance Test Python` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming) | | beam_PerformanceTests_SpannerIO_Read_2GB_Python | [cron](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python/), [phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python_PR/) | `Run SpannerIO Read 2GB Performance Test Python Batch` | [![Build Status](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python/badge/icon)](https://ci-beam.apache.o [...] | beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch/), [phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch_PR/) | `Run SpannerIO Write 2GB Performance Test Python Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_Per [...] +| beam_PerformanceTests_SparkReceiverIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_SparkReceiverIOIT/) | `Run Java SparkReceiverIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_SparkReceiverIO/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_Spa [...] | beam_PerformanceTests_TFRecordIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT/) | `Run Java TFRecordIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT) | | beam_PerformanceTests_TextIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT/), [hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/) | `Run Java TextIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT) [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/badge/icon)](https://ci-be [...] | beam_PerformanceTests_WordCountIT_Py37 | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37/) | `Run Python37 WordCountIT Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37) | diff --git a/.test-infra/jenkins/job_PerformanceTests_SparkReceiverIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_SparkReceiverIO_IT.groovy new file mode 100644 index 00000000000..0bfb01b43ce --- /dev/null +++ b/.test-infra/jenkins/job_PerformanceTests_SparkReceiverIO_IT.groovy @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as common +import Kubernetes +import InfluxDBCredentialsHelper + +String jobName = "beam_PerformanceTests_SparkReceiver_IO" + +/** + * This job runs the SparkReceiver IO performance tests. + It runs on a RabbitMQ cluster that is build by applying the folder .test-infra/kubernetes/rabbit, + in an existing kubernetes cluster (DEFAULT_CLUSTER in Kubernetes.groovy). + The services created to run this test are: + Pods: 1 RabbitMq pods. + Services: 1 broker + When the performance tests finish all resources are cleaned up by a postBuild step in Kubernetes.groovy + */ +job(jobName) { + common.setTopLevelMainJobProperties(delegate, 'master', 120) + common.setAutoJob(delegate, 'H H/6 * * *') + common.enablePhraseTriggeringFromPullRequest( + delegate, + 'Java SparkReceiverIO Performance Test', + 'Run Java SparkReceiverIO Performance Test') + InfluxDBCredentialsHelper.useCredentials(delegate) + + String namespace = common.getKubernetesNamespace(jobName) + String kubeconfig = common.getKubeconfigLocationForNamespace(namespace) + Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace) + + k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/rabbit/rabbitmq.yaml")) + String rabbitMqHostName = "LOAD_BALANCER_IP" + k8s.loadBalancerIP("rabbitmq", rabbitMqHostName) + + Map pipelineOptions = [ + tempRoot : 'gs://temp-storage-for-perf-tests', + project : 'apache-beam-testing', + runner : 'DataflowRunner', + sourceOptions : """ + { + "numRecords": "600000", + "keySizeBytes": "1", + "valueSizeBytes": "90" + } + """.trim().replaceAll("\\s", ""), + bigQueryDataset : 'beam_performance', + bigQueryTable : 'sparkreceiverioit_results', + influxMeasurement : 'sparkreceiverioit_results', + influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName, + influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl, + rabbitMqBootstrapServerAddress: "amqp://guest:guest@\$${rabbitMqHostName}:5672", + streamName : 'rabbitMqTestStream', + readTimeout : '900', + numWorkers : '5', + autoscalingAlgorithm : 'NONE' + ] + + steps { + gradle { + rootBuildScriptDir(common.checkoutDir) + common.setGradleSwitches(delegate) + switches("--info") + switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(pipelineOptions)}\'") + switches("-DintegrationTestRunner=dataflow") + tasks(":sdks:java:io:sparkreceiver:integrationTest --tests org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIOIT") + } + } +} diff --git a/.test-infra/kubernetes/rabbit/rabbitmq.yaml b/.test-infra/kubernetes/rabbit/rabbitmq.yaml new file mode 100644 index 00000000000..72bd41d5a92 --- /dev/null +++ b/.test-infra/kubernetes/rabbit/rabbitmq.yaml @@ -0,0 +1,187 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: rabbitmq +--- +kind: Service +apiVersion: v1 +metadata: + name: rabbitmq-internal + labels: + app: rabbitmq +spec: + clusterIP: None + ports: + - name: http + protocol: TCP + port: 15672 + - name: amqp + protocol: TCP + port: 5672 + selector: + app: rabbitmq +--- +kind: Service +apiVersion: v1 +metadata: + name: rabbitmq + labels: + app: rabbitmq + type: LoadBalancer +spec: + type: LoadBalancer + ports: + - name: http + protocol: TCP + port: 15672 + - name: amqp + protocol: TCP + port: 5672 + selector: + app: rabbitmq +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: rabbitmq-config +data: + enabled_plugins: | + [rabbitmq_management,rabbitmq_peer_discovery_k8s]. + + rabbitmq.conf: | + loopback_users = none + cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s + cluster_formation.k8s.host = kubernetes.default.svc.cluster.local + cluster_formation.k8s.port = 443 + ### cluster_formation.k8s.address_type = ip + cluster_formation.k8s.address_type = hostname + cluster_formation.node_cleanup.interval = 10 + cluster_formation.node_cleanup.only_log_warning = true + cluster_partition_handling = autoheal + queue_master_locator=min-masters + cluster_formation.randomized_startup_delay_range.min = 0 + cluster_formation.randomized_startup_delay_range.max = 2 + cluster_formation.k8s.service_name = rabbitmq-internal + cluster_formation.k8s.hostname_suffix = .rabbitmq-internal.our-namespace.svc.cluster.local +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: rabbitmq +spec: + selector: + matchLabels: + app: "rabbitmq" + serviceName: rabbitmq-internal + replicas: 1 + volumeClaimTemplates: + - metadata: + name: rabbitmq-data + namespace: rabbit-test + spec: + storageClassName: standard + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "3Gi" + template: + metadata: + labels: + app: rabbitmq + annotations: + scheduler.alpha.kubernetes.io/affinity: > + { + "podAntiAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": [{ + "labelSelector": { + "matchExpressions": [{ + "key": "app", + "operator": "In", + "values": ["rabbitmq"] + }] + }, + "topologyKey": "kubernetes.io/hostname" + }] + } + } + spec: + serviceAccountName: rabbitmq + terminationGracePeriodSeconds: 10 + containers: + - name: rabbitmq-k8s + image: rabbitmq:3.7 + volumeMounts: + - name: config-volume + mountPath: /etc/rabbitmq + - name: rabbitmq-data + mountPath: /var/lib/rabbitmq/mnesia + ports: + - name: http + protocol: TCP + containerPort: 15672 + - name: amqp + protocol: TCP + containerPort: 5672 + livenessProbe: + exec: + command: ["rabbitmqctl", "status"] + initialDelaySeconds: 60 + periodSeconds: 10 + timeoutSeconds: 10 + readinessProbe: + exec: + command: ["rabbitmqctl", "status"] + initialDelaySeconds: 10 + periodSeconds: 10 + timeoutSeconds: 10 + imagePullPolicy: Always + env: + - name: MY_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: HOSTNAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: RABBITMQ_USE_LONGNAME + value: "true" + - name: RABBITMQ_NODENAME + value: "rabbit@$(HOSTNAME).rabbitmq-internal.$(NAMESPACE).svc.cluster.local" + - name: K8S_SERVICE_NAME + value: "rabbitmq-internal" + - name: RABBITMQ_ERLANG_COOKIE + value: "cookie" + volumes: + - name: config-volume + configMap: + name: rabbitmq-config + items: + - key: rabbitmq.conf + path: rabbitmq.conf + - key: enabled_plugins + path: enabled_plugins + - name: rabbitmq-data + persistentVolumeClaim: + claimName: rabbitmq-data diff --git a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Java_IO_IT_Tests_Dataflow.json b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Java_IO_IT_Tests_Dataflow.json index 1c6cde8bbb7..733de44551c 100644 --- a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Java_IO_IT_Tests_Dataflow.json +++ b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Java_IO_IT_Tests_Dataflow.json @@ -2717,6 +2717,129 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "cacheTimeout": null, + "dashLength": 10, + "dashes": false, + "datasource": "BeamInfluxDB", + "description": "", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 106 + }, + "hiddenSeries": false, + "id": 27, + "interval": "6h", + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pluginVersion": "6.7.2", + "pointradius": 2, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "alias": "$tag_metric", + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + } + ], + "measurement": "", + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT mean(\"value\") FROM \"sparkreceiverioit_results\" WHERE \"metric\" =~ /time/ AND $timeFilter GROUP BY time($__interval), \"metric\"", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "SparkReceiverIO", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:403", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:404", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "refresh": false, diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 6766db5d509..6aa2e4859c5 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -726,6 +726,7 @@ class BeamModulePlugin implements Plugin<Project> { testcontainers_postgresql : "org.testcontainers:postgresql:$testcontainers_version", testcontainers_mysql : "org.testcontainers:mysql:$testcontainers_version", testcontainers_gcloud : "org.testcontainers:gcloud:$testcontainers_version", + testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version", vendored_grpc_1_48_1 : "org.apache.beam:beam-vendor-grpc-1_48_1:0.1", vendored_guava_26_0_jre : "org.apache.beam:beam-vendor-guava-26_0-jre:0.1", vendored_calcite_1_28_0 : "org.apache.beam:beam-vendor-calcite-1_28_0:0.2", diff --git a/sdks/java/io/sparkreceiver/README.md b/sdks/java/io/sparkreceiver/README.md new file mode 100644 index 00000000000..6ce48efd58f --- /dev/null +++ b/sdks/java/io/sparkreceiver/README.md @@ -0,0 +1,38 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +SparkReceiverIO contains I/O transforms which allow you to read messages from Spark Receiver (org.apache.spark.streaming.receiver.Receiver). + +## Dependencies + +To use SparkReceiverIO you must first add a dependency on `beam-sdks-java-io-sparkreceiver`. + +```maven +<dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-sparkreceiver</artifactId> + <version>...</version> +</dependency> +``` + +## Documentation + +The documentation is maintained in JavaDoc for SparkReceiverIO class. It includes +usage examples and primary concepts. +- [SparkReceiverIO.java](src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java) diff --git a/sdks/java/io/sparkreceiver/build.gradle b/sdks/java/io/sparkreceiver/build.gradle index fb1f681d926..52c6a634049 100644 --- a/sdks/java/io/sparkreceiver/build.gradle +++ b/sdks/java/io/sparkreceiver/build.gradle @@ -33,9 +33,10 @@ ext.summary = """Apache Beam SDK provides a simple, Java-based interface for streaming integration with CDAP plugins.""" configurations.all { - exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'ch.qos.logback', module: 'logback-classic' exclude group: 'org.slf4j', module: 'slf4j-jdk14' - exclude group: 'org.slf4j', module: 'slf4j-simple' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'org.slf4j', module: 'slf4j-reload4j' } dependencies { @@ -48,6 +49,10 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") compileOnly "org.scala-lang:scala-library:2.11.12" testImplementation library.java.junit + testImplementation library.java.testcontainers_rabbitmq testImplementation project(path: ":runners:direct-java", configuration: "shadow") - testImplementation project(path: ":examples:java", configuration: "testRuntimeMigration") + testImplementation project(":sdks:java:io:synthetic") + testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") + testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration") + testImplementation "com.rabbitmq:amqp-client:5.16.0" } diff --git a/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java b/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java index 2eb46561e2c..8b2fdcb01ad 100644 --- a/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java +++ b/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java @@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.sparkreceiver; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import java.math.BigDecimal; +import java.math.MathContext; import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -31,6 +33,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -60,7 +63,7 @@ class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> { LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class); /** Constant waiting time after the {@link Receiver} starts. Required to prepare for polling */ - private static final int START_POLL_TIMEOUT_MS = 1000; + private static final int START_POLL_TIMEOUT_MS = 2000; private final SerializableFunction<Instant, WatermarkEstimator<Instant>> createWatermarkEstimatorFn; @@ -108,10 +111,68 @@ class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> { return restrictionTracker(element, offsetRange).getProgress().getWorkRemaining(); } + /** + * {@link OffsetRangeTracker} that performs basic split only in {@link + * OffsetRangeTracker#checkDone}. This behavior allows reading from primary range until resume, + * and then split to {alreadyReadRange, residualRange}. + */ + private static class CustomOffsetRangeTracker extends OffsetRangeTracker { + + public CustomOffsetRangeTracker(OffsetRange range) { + super(range); + } + + @SuppressWarnings("nullness") // Base method can return null + @Override + public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) { + if (lastAttemptedOffset != null) { + if (range.getTo() == Long.MAX_VALUE) { + // Do not split, just use primary range + return null; + } else { + // Need to add residual range + OffsetRange res = new OffsetRange(range.getTo(), Long.MAX_VALUE); + this.range = new OffsetRange(range.getFrom(), range.getTo()); + return SplitResult.of(range, res); + } + } + // Basic split logic when lastAttemptedOffset is null + + // Convert to BigDecimal in computation to prevent overflow, which may result in loss of + // precision. + BigDecimal cur = + BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE, MathContext.DECIMAL128); + // split = cur + max(1, (range.getTo() - cur) * fractionOfRemainder) + BigDecimal splitPos = + cur.add( + BigDecimal.valueOf(range.getTo()) + .subtract(cur, MathContext.DECIMAL128) + .multiply(BigDecimal.valueOf(fractionOfRemainder), MathContext.DECIMAL128) + .max(BigDecimal.ONE), + MathContext.DECIMAL128); + + long split = splitPos.longValue(); + if (split >= range.getTo()) { + return null; + } + OffsetRange res = new OffsetRange(split, range.getTo()); + this.range = new OffsetRange(range.getFrom(), split); + return SplitResult.of(range, res); + } + + @Override + public void checkDone() throws IllegalStateException { + if (lastAttemptedOffset != null && range.getTo() == Long.MAX_VALUE) { + // Perform basic split + super.trySplit(0); + } + } + } + @NewTracker public OffsetRangeTracker restrictionTracker( @Element byte[] element, @Restriction OffsetRange restriction) { - return new OffsetRangeTracker(restriction); + return new CustomOffsetRangeTracker(restriction); } @GetRestrictionCoder @@ -223,27 +284,38 @@ class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> { LOG.error("Can not build Spark Receiver", e); throw new IllegalStateException("Spark Receiver was not built!"); } + LOG.debug("Restriction {}", tracker.currentRestriction().toString()); sparkConsumer = new SparkConsumerWithOffset<>(tracker.currentRestriction().getFrom()); sparkConsumer.start(sparkReceiver); - while (sparkConsumer.hasRecords()) { - V record = sparkConsumer.poll(); - if (record != null) { - Long offset = getOffsetFn.apply(record); - if (!tracker.tryClaim(offset)) { - sparkConsumer.stop(); - LOG.debug("Stop for restriction: {}", tracker.currentRestriction().toString()); - return ProcessContinuation.stop(); + while (true) { + try { + TimeUnit.MILLISECONDS.sleep(START_POLL_TIMEOUT_MS); + } catch (InterruptedException e) { + LOG.error("SparkReceiver was interrupted before polling started", e); + throw new IllegalStateException("Spark Receiver was interrupted before polling started"); + } + if (!sparkConsumer.hasRecords()) { + sparkConsumer.stop(); + tracker.checkDone(); + LOG.debug("Resume for restriction: {}", tracker.currentRestriction().toString()); + return ProcessContinuation.resume(); + } + while (sparkConsumer.hasRecords()) { + V record = sparkConsumer.poll(); + if (record != null) { + Long offset = getOffsetFn.apply(record); + if (!tracker.tryClaim(offset)) { + sparkConsumer.stop(); + LOG.debug("Stop for restriction: {}", tracker.currentRestriction().toString()); + return ProcessContinuation.stop(); + } + Instant currentTimeStamp = getTimestampFn.apply(record); + ((ManualWatermarkEstimator<Instant>) watermarkEstimator).setWatermark(currentTimeStamp); + receiver.outputWithTimestamp(record, currentTimeStamp); } - Instant currentTimeStamp = getTimestampFn.apply(record); - ((ManualWatermarkEstimator<Instant>) watermarkEstimator).setWatermark(currentTimeStamp); - System.err.println(record); - receiver.outputWithTimestamp(record, currentTimeStamp); } } - sparkConsumer.stop(); - LOG.debug("Resume for restriction: {}", tracker.currentRestriction().toString()); - return ProcessContinuation.resume(); } private static Instant ensureTimestampWithinBounds(Instant timestamp) { diff --git a/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java new file mode 100644 index 00000000000..362e6280eb2 --- /dev/null +++ b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.sparkreceiver; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Imitation of Spark {@link Receiver} for RabbitMQ that implements {@link HasOffset} interface. + * Used to test {@link SparkReceiverIO#read()}. + */ +class RabbitMqReceiverWithOffset extends Receiver<String> implements HasOffset { + + private static final Logger LOG = LoggerFactory.getLogger(RabbitMqReceiverWithOffset.class); + private static final int MAX_PREFETCH_COUNT = 65535; + + private final String rabbitmqUrl; + private final String streamName; + private final long totalMessagesNumber; + private long startOffset; + private static final int READ_TIMEOUT_IN_MS = 100; + + RabbitMqReceiverWithOffset( + final String uri, final String streamName, final long totalMessagesNumber) { + super(StorageLevel.MEMORY_AND_DISK_2()); + rabbitmqUrl = uri; + this.streamName = streamName; + this.totalMessagesNumber = totalMessagesNumber; + } + + @Override + public void setStartOffset(Long startOffset) { + this.startOffset = startOffset != null ? startOffset : 0; + } + + @Override + public Long getEndOffset() { + return Long.MAX_VALUE; + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void onStart() { + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build()).submit(this::receive); + } + + @Override + public void onStop() {} + + private void receive() { + long currentOffset = startOffset; + + final TestConsumer testConsumer; + final Connection connection; + final Channel channel; + + try { + LOG.info("Starting receiver with offset {}", currentOffset); + final ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setUri(rabbitmqUrl); + connectionFactory.setAutomaticRecoveryEnabled(true); + connectionFactory.setConnectionTimeout(600000); + connectionFactory.setNetworkRecoveryInterval(5000); + connectionFactory.setRequestedHeartbeat(60); + connectionFactory.setTopologyRecoveryEnabled(true); + connectionFactory.setRequestedChannelMax(0); + connectionFactory.setRequestedFrameMax(0); + connection = connectionFactory.newConnection(); + + channel = connection.createChannel(); + channel.queueDeclare( + streamName, true, false, false, Collections.singletonMap("x-queue-type", "stream")); + channel.basicQos(Math.min(MAX_PREFETCH_COUNT, (int) totalMessagesNumber)); + testConsumer = new TestConsumer(this, channel, this::store); + + channel.basicConsume( + streamName, + false, + Collections.singletonMap("x-stream-offset", currentOffset), + testConsumer); + } catch (Exception e) { + LOG.error("Can not basic consume", e); + throw new RuntimeException(e); + } + + while (!isStopped()) { + try { + TimeUnit.MILLISECONDS.sleep(READ_TIMEOUT_IN_MS); + } catch (InterruptedException e) { + LOG.error("Interrupted", e); + } + } + + try { + LOG.info("Stopping receiver"); + channel.close(); + connection.close(); + } catch (TimeoutException | IOException e) { + throw new RuntimeException(e); + } + } + + /** A simple RabbitMQ {@code Consumer}. */ + static class TestConsumer extends DefaultConsumer { + + private final java.util.function.Consumer<String> messageConsumer; + private final Receiver<String> receiver; + + public TestConsumer( + Receiver<String> receiver, + Channel channel, + java.util.function.Consumer<String> messageConsumer) { + super(channel); + this.receiver = receiver; + this.messageConsumer = messageConsumer; + } + + @Override + public void handleDelivery( + String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + try { + final String sMessage = new String(body, StandardCharsets.UTF_8); + LOG.trace("Adding message to consumer: {}", sMessage); + messageConsumer.accept(sMessage); + if (getChannel().isOpen() && !receiver.isStopped()) { + getChannel().basicAck(envelope.getDeliveryTag(), false); + } + } catch (Exception e) { + LOG.error("Can't read from RabbitMQ: {}", e.getMessage()); + } + } + } +} diff --git a/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java new file mode 100644 index 00000000000..67b4e2cabba --- /dev/null +++ b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.sparkreceiver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.joda.time.Instant; +import org.junit.Test; + +/** Test class for {@link ReadFromSparkReceiverWithOffsetDoFn}. */ +public class ReadFromSparkReceiverWithOffsetDoFnTest { + + private static final byte[] TEST_ELEMENT = new byte[] {}; + + private final ReadFromSparkReceiverWithOffsetDoFn<String> dofnInstance = + new ReadFromSparkReceiverWithOffsetDoFn<>(makeReadTransform()); + + private SparkReceiverIO.Read<String> makeReadTransform() { + ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder = + new ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs(); + return SparkReceiverIO.<String>read() + .withSparkReceiverBuilder(receiverBuilder) + .withGetOffsetFn(Long::valueOf) + .withTimestampFn(Instant::parse); + } + + private static class MockOutputReceiver implements DoFn.OutputReceiver<String> { + + private final List<String> records = new ArrayList<>(); + + @Override + public void output(String output) {} + + @Override + public void outputWithTimestamp( + String output, @UnknownKeyFor @NonNull @Initialized Instant timestamp) { + records.add(output); + } + + public List<String> getOutputs() { + return this.records; + } + } + + private final ManualWatermarkEstimator<Instant> mockWatermarkEstimator = + new ManualWatermarkEstimator<Instant>() { + + @Override + public void setWatermark(Instant watermark) { + // do nothing + } + + @Override + public Instant currentWatermark() { + return null; + } + + @Override + public Instant getState() { + return null; + } + }; + + private List<String> createExpectedRecords(int numRecords) { + List<String> records = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + records.add(String.valueOf(i)); + } + return records; + } + + @Test + public void testInitialRestriction() { + long expectedStartOffset = 0L; + OffsetRange result = dofnInstance.initialRestriction(TEST_ELEMENT); + assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result); + } + + @Test + public void testRestrictionTrackerSplit() { + OffsetRangeTracker offsetRangeTracker = + dofnInstance.restrictionTracker( + TEST_ELEMENT, dofnInstance.initialRestriction(TEST_ELEMENT)); + assertEquals(0L, offsetRangeTracker.currentRestriction().getFrom()); + assertEquals(Long.MAX_VALUE, offsetRangeTracker.currentRestriction().getTo()); + + assertEquals( + SplitResult.of(new OffsetRange(0, 0), new OffsetRange(0, Long.MAX_VALUE)), + offsetRangeTracker.trySplit(0d)); + + offsetRangeTracker = + dofnInstance.restrictionTracker( + TEST_ELEMENT, dofnInstance.initialRestriction(TEST_ELEMENT)); + + assertTrue(offsetRangeTracker.tryClaim(0L)); + assertNull(offsetRangeTracker.trySplit(0d)); + + offsetRangeTracker.checkDone(); + assertEquals( + SplitResult.of(new OffsetRange(0, 1), new OffsetRange(1, Long.MAX_VALUE)), + offsetRangeTracker.trySplit(0d)); + } + + @Test + public void testProcessElement() { + MockOutputReceiver receiver = new MockOutputReceiver(); + DoFn.ProcessContinuation result = + dofnInstance.processElement( + TEST_ELEMENT, + dofnInstance.restrictionTracker( + TEST_ELEMENT, dofnInstance.initialRestriction(TEST_ELEMENT)), + mockWatermarkEstimator, + receiver); + assertEquals(DoFn.ProcessContinuation.resume(), result); + assertEquals( + createExpectedRecords(CustomReceiverWithOffset.RECORDS_COUNT), receiver.getOutputs()); + } +} diff --git a/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java new file mode 100644 index 00000000000..b335aab2ed5 --- /dev/null +++ b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.sparkreceiver; + +import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.cloud.Timestamp; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.MessageProperties; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.common.IOITHelper; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.metrics.MetricsReader; +import org.apache.beam.sdk.testutils.metrics.TimeMonitor; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}. + * + * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for + * more details. + * + * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the + * topic so that we could read them back after writing. + */ +@RunWith(JUnit4.class) +public class SparkReceiverIOIT { + + private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class); + + private static final String READ_TIME_METRIC_NAME = "read_time"; + + private static final String RUN_TIME_METRIC_NAME = "run_time"; + + private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count"; + + private static final String NAMESPACE = SparkReceiverIOIT.class.getName(); + + private static final String TEST_ID = UUID.randomUUID().toString(); + + private static final String TIMESTAMP = Timestamp.now().toString(); + + private static final String TEST_MESSAGE_PREFIX = "Test "; + + private static Options options; + + private static SyntheticSourceOptions sourceOptions; + + private static GenericContainer<?> rabbitMqContainer; + + private static InfluxDBSettings settings; + + private static final ExperimentalOptions sdfPipelineOptions; + + static { + sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class); + sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false); + } + + @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions); + + @BeforeClass + public static void setup() throws IOException { + options = IOITHelper.readIOTestPipelineOptions(Options.class); + sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class); + if (options.isWithTestcontainers()) { + setupRabbitMqContainer(); + } else { + settings = + InfluxDBSettings.builder() + .withHost(options.getInfluxHost()) + .withDatabase(options.getInfluxDatabase()) + .withMeasurement(options.getInfluxMeasurement()) + .get(); + } + clearRabbitMQ(); + } + + @AfterClass + public static void afterClass() { + if (rabbitMqContainer != null) { + rabbitMqContainer.stop(); + } + + clearRabbitMQ(); + } + + private static void setupRabbitMqContainer() { + rabbitMqContainer = + new RabbitMQContainer( + DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion())) + .withExposedPorts(5672, 15672); + rabbitMqContainer.start(); + options.setRabbitMqBootstrapServerAddress( + getBootstrapServers( + rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString())); + } + + private static String getBootstrapServers(String host, String port) { + return String.format("amqp://guest:guest@%s:%s", host, port); + } + + /** Pipeline options specific for this test. */ + public interface Options extends IOTestPipelineOptions, StreamingOptions { + + @Description("Options for synthetic source.") + @Validation.Required + @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}") + String getSourceOptions(); + + void setSourceOptions(String sourceOptions); + + @Description("RabbitMQ bootstrap server address") + @Default.String("amqp://guest:guest@localhost:5672") + String getRabbitMqBootstrapServerAddress(); + + void setRabbitMqBootstrapServerAddress(String address); + + @Description("RabbitMQ stream") + @Default.String("rabbitMqTestStream") + String getStreamName(); + + void setStreamName(String streamName); + + @Description("Whether to use testcontainers") + @Default.Boolean(false) + Boolean isWithTestcontainers(); + + void setWithTestcontainers(Boolean withTestcontainers); + + @Description("RabbitMQ container version. Use when useTestcontainers is true") + @Nullable + @Default.String("3.9-alpine") + String getRabbitMqContainerVersion(); + + void setRabbitMqContainerVersion(String rabbitMqContainerVersion); + + @Description("Time to wait for the events to be processed by the read pipeline (in seconds)") + @Default.Integer(50) + @Validation.Required + Integer getReadTimeout(); + + void setReadTimeout(Integer readTimeout); + } + + private void writeToRabbitMq(final List<String> messages) + throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, + TimeoutException { + + final ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress()); + Map<String, Object> arguments = new HashMap<>(); + arguments.put("x-queue-type", "stream"); + + try (Connection connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel()) { + channel.queueDeclare(options.getStreamName(), true, false, false, arguments); + + messages.forEach( + message -> { + try { + channel.basicPublish( + "", + options.getStreamName(), + MessageProperties.PERSISTENT_TEXT_PLAIN, + message.getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } + + private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() { + final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder = + new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class) + .withConstructorArgs( + options.getRabbitMqBootstrapServerAddress(), + options.getStreamName(), + sourceOptions.numRecords); + + return SparkReceiverIO.<String>read() + .withGetOffsetFn( + rabbitMqMessage -> + Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length()))) + .withSparkReceiverBuilder(receiverBuilder); + } + + /** + * Since streams in RabbitMQ are durable by definition, we have to clean them up after test + * execution. The simplest way is to delete the whole stream after test execution. + */ + private static void clearRabbitMQ() { + final ConnectionFactory connectionFactory = new ConnectionFactory(); + + try { + connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress()); + try (Connection connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel()) { + channel.queueDelete(options.getStreamName()); + } + } catch (URISyntaxException + | NoSuchAlgorithmException + | KeyManagementException + | IOException + | TimeoutException e) { + LOG.error("Error during RabbitMQ clean up", e); + } + } + + /** Function for counting processed pipeline elements. */ + private static class CountingFn extends DoFn<String, Void> { + + private final Counter elementCounter; + + CountingFn(String namespace, String name) { + elementCounter = Metrics.counter(namespace, name); + } + + @ProcessElement + public void processElement() { + elementCounter.inc(1L); + } + } + + private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState) + throws IOException { + if (readState == null) { + readResult.cancel(); + } + } + + private long readElementMetric(PipelineResult result) { + MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE); + return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME); + } + + private Set<NamedTestResult> readMetrics(PipelineResult readResult) { + BiFunction<MetricsReader, String, NamedTestResult> supplier = + (reader, metricName) -> { + long start = reader.getStartTimeMetric(metricName); + long end = reader.getEndTimeMetric(metricName); + return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3); + }; + + NamedTestResult readTime = + supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME); + NamedTestResult runTime = + NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue()); + + return ImmutableSet.of(readTime, runTime); + } + + @Test + public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException { + + final List<String> messages = + LongStream.range(0, sourceOptions.numRecords) + .mapToObj(number -> TEST_MESSAGE_PREFIX + number) + .collect(Collectors.toList()); + + try { + writeToRabbitMq(messages); + } catch (Exception e) { + LOG.error("Can not write to rabbit {}", e.getMessage()); + fail(); + } + LOG.info(sourceOptions.numRecords + " records were successfully written to RabbitMQ"); + + // Use streaming pipeline to read RabbitMQ records. + readPipeline.getOptions().as(Options.class).setStreaming(true); + readPipeline + .apply("Read from unbounded RabbitMq", readFromRabbitMqWithOffset()) + .setCoder(StringUtf8Coder.of()) + .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) + .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); + + final PipelineResult readResult = readPipeline.run(); + final PipelineResult.State readState = + readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout())); + + cancelIfTimeout(readResult, readState); + + assertEquals(sourceOptions.numRecords, readElementMetric(readResult)); + + if (!options.isWithTestcontainers()) { + Set<NamedTestResult> metrics = readMetrics(readResult); + IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings); + } + } +}