This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f57cb9c208d [CdapIO] Add integration tests for SparkReceiverIO (#23305)
f57cb9c208d is described below

commit f57cb9c208d9e1ac2e353c3a2d2be2b5b3ccee4a
Author: Vitaly Terentyev <vitaly.terent...@akvelon.com>
AuthorDate: Wed Nov 2 20:15:50 2022 +0400

    [CdapIO] Add integration tests for SparkReceiverIO (#23305)
    
    * Add integration tests for SparkReceiverIO
    
    * Refactoring. Resolve comments.
    
    * Add CustomOffsetRangeTracker. Fix reading from RabbitMqReceiver
    
    * Fix compile
    
    * Fix rabbitmq yaml file
    
    * Refactoring
    
    * Add tests for SDF
    
    Co-authored-by: Sorokin Andrey <sorokin1...@gmail.com>
---
 .test-infra/jenkins/README.md                      |   1 +
 .../job_PerformanceTests_SparkReceiverIO_IT.groovy |  84 +++++
 .test-infra/kubernetes/rabbit/rabbitmq.yaml        | 187 +++++++++++
 .../Java_IO_IT_Tests_Dataflow.json                 | 123 +++++++
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   1 +
 sdks/java/io/sparkreceiver/README.md               |  38 +++
 sdks/java/io/sparkreceiver/build.gradle            |  11 +-
 .../ReadFromSparkReceiverWithOffsetDoFn.java       | 106 +++++-
 .../sparkreceiver/RabbitMqReceiverWithOffset.java  | 163 ++++++++++
 .../ReadFromSparkReceiverWithOffsetDoFnTest.java   | 145 +++++++++
 .../sdk/io/sparkreceiver/SparkReceiverIOIT.java    | 354 +++++++++++++++++++++
 11 files changed, 1193 insertions(+), 20 deletions(-)

diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index e547c23f24f..e53dae86c45 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -155,6 +155,7 @@ Beam Jenkins overview page: 
[link](https://ci-beam.apache.org/)
 | beam_PerformanceTests_PubsubIOIT_Python_Streaming | 
[cron](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming/),
 
[phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming_PR/)
 | `Run PubsubIO Performance Test Python` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming)
 |
 | beam_PerformanceTests_SpannerIO_Read_2GB_Python | 
[cron](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python/),
 
[phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python_PR/)
 | `Run SpannerIO Read 2GB Performance Test Python Batch` | [![Build 
Status](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python/badge/icon)](https://ci-beam.apache.o
 [...]
 | beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch | 
[cron](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch/),
 
[phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch_PR/)
 | `Run SpannerIO Write 2GB Performance Test Python Batch` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_Per
 [...]
+| beam_PerformanceTests_SparkReceiverIOIT                | 
[cron](https://ci-beam.apache.org/job/beam_PerformanceTests_SparkReceiverIOIT/) 
                                                                                
                                                          | `Run Java 
SparkReceiverIO Performance Test`             | [![Build 
Status](https://ci-beam.apache.org/job/beam_PerformanceTests_SparkReceiverIO/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_Spa
 [...]
 | beam_PerformanceTests_TFRecordIOIT | 
[cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT/) | 
`Run Java TFRecordIO Performance Test` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT)
 |
 | beam_PerformanceTests_TextIOIT | 
[cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT/), 
[hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/)
 | `Run Java TextIO Performance Test` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT)
 [![Build 
Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/badge/icon)](https://ci-be
 [...]
 | beam_PerformanceTests_WordCountIT_Py37 | 
[cron](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37/) 
| `Run Python37 WordCountIT Performance Test` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37)
 |
diff --git a/.test-infra/jenkins/job_PerformanceTests_SparkReceiverIO_IT.groovy 
b/.test-infra/jenkins/job_PerformanceTests_SparkReceiverIO_IT.groovy
new file mode 100644
index 00000000000..0bfb01b43ce
--- /dev/null
+++ b/.test-infra/jenkins/job_PerformanceTests_SparkReceiverIO_IT.groovy
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import Kubernetes
+import InfluxDBCredentialsHelper
+
+String jobName = "beam_PerformanceTests_SparkReceiver_IO"
+
+/**
+ * This job runs the SparkReceiver IO performance tests.
+ It runs on a RabbitMQ cluster that is build by applying the folder 
.test-infra/kubernetes/rabbit,
+ in an existing kubernetes cluster (DEFAULT_CLUSTER in Kubernetes.groovy).
+ The services created to run this test are:
+ Pods: 1 RabbitMq pods.
+ Services: 1 broker
+ When the performance tests finish all resources are cleaned up by a postBuild 
step in Kubernetes.groovy
+ */
+job(jobName) {
+  common.setTopLevelMainJobProperties(delegate, 'master', 120)
+  common.setAutoJob(delegate, 'H H/6 * * *')
+  common.enablePhraseTriggeringFromPullRequest(
+      delegate,
+      'Java SparkReceiverIO Performance Test',
+      'Run Java SparkReceiverIO Performance Test')
+  InfluxDBCredentialsHelper.useCredentials(delegate)
+
+  String namespace = common.getKubernetesNamespace(jobName)
+  String kubeconfig = common.getKubeconfigLocationForNamespace(namespace)
+  Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace)
+
+  
k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/rabbit/rabbitmq.yaml"))
+  String rabbitMqHostName = "LOAD_BALANCER_IP"
+  k8s.loadBalancerIP("rabbitmq", rabbitMqHostName)
+
+  Map pipelineOptions = [
+    tempRoot                      : 'gs://temp-storage-for-perf-tests',
+    project                       : 'apache-beam-testing',
+    runner                        : 'DataflowRunner',
+    sourceOptions                 : """
+                                     {
+                                       "numRecords": "600000",
+                                       "keySizeBytes": "1",
+                                       "valueSizeBytes": "90"
+                                     }
+                                   """.trim().replaceAll("\\s", ""),
+    bigQueryDataset               : 'beam_performance',
+    bigQueryTable                 : 'sparkreceiverioit_results',
+    influxMeasurement             : 'sparkreceiverioit_results',
+    influxDatabase                : 
InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+    influxHost                    : InfluxDBCredentialsHelper.InfluxDBHostUrl,
+    rabbitMqBootstrapServerAddress: 
"amqp://guest:guest@\$${rabbitMqHostName}:5672",
+    streamName                    : 'rabbitMqTestStream',
+    readTimeout                   : '900',
+    numWorkers                    : '5',
+    autoscalingAlgorithm          : 'NONE'
+  ]
+
+  steps {
+    gradle {
+      rootBuildScriptDir(common.checkoutDir)
+      common.setGradleSwitches(delegate)
+      switches("--info")
+      
switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(pipelineOptions)}\'")
+      switches("-DintegrationTestRunner=dataflow")
+      tasks(":sdks:java:io:sparkreceiver:integrationTest --tests 
org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIOIT")
+    }
+  }
+}
diff --git a/.test-infra/kubernetes/rabbit/rabbitmq.yaml 
b/.test-infra/kubernetes/rabbit/rabbitmq.yaml
new file mode 100644
index 00000000000..72bd41d5a92
--- /dev/null
+++ b/.test-infra/kubernetes/rabbit/rabbitmq.yaml
@@ -0,0 +1,187 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: rabbitmq
+---
+kind: Service
+apiVersion: v1
+metadata:
+  name: rabbitmq-internal
+  labels:
+    app: rabbitmq
+spec:
+  clusterIP: None
+  ports:
+    - name: http
+      protocol: TCP
+      port: 15672
+    - name: amqp
+      protocol: TCP
+      port: 5672
+  selector:
+    app: rabbitmq
+---
+kind: Service
+apiVersion: v1
+metadata:
+  name: rabbitmq
+  labels:
+    app: rabbitmq
+    type: LoadBalancer
+spec:
+  type: LoadBalancer
+  ports:
+    - name: http
+      protocol: TCP
+      port: 15672
+    - name: amqp
+      protocol: TCP
+      port: 5672
+  selector:
+    app: rabbitmq
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: rabbitmq-config
+data:
+  enabled_plugins: |
+    [rabbitmq_management,rabbitmq_peer_discovery_k8s].
+
+  rabbitmq.conf: |
+    loopback_users = none
+    cluster_formation.peer_discovery_backend  = rabbit_peer_discovery_k8s
+    cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
+    cluster_formation.k8s.port = 443
+    ### cluster_formation.k8s.address_type = ip
+    cluster_formation.k8s.address_type = hostname
+    cluster_formation.node_cleanup.interval = 10
+    cluster_formation.node_cleanup.only_log_warning = true
+    cluster_partition_handling = autoheal
+    queue_master_locator=min-masters
+    cluster_formation.randomized_startup_delay_range.min = 0
+    cluster_formation.randomized_startup_delay_range.max = 2
+    cluster_formation.k8s.service_name = rabbitmq-internal
+    cluster_formation.k8s.hostname_suffix = 
.rabbitmq-internal.our-namespace.svc.cluster.local
+---
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+  name: rabbitmq
+spec:
+  selector:
+    matchLabels:
+      app: "rabbitmq"
+  serviceName: rabbitmq-internal
+  replicas: 1
+  volumeClaimTemplates:
+    - metadata:
+        name: rabbitmq-data
+        namespace: rabbit-test
+      spec:
+        storageClassName: standard
+        accessModes:
+          - ReadWriteOnce
+        resources:
+          requests:
+            storage: "3Gi"
+  template:
+    metadata:
+      labels:
+        app: rabbitmq
+      annotations:
+        scheduler.alpha.kubernetes.io/affinity: >
+          {
+            "podAntiAffinity": {
+              "requiredDuringSchedulingIgnoredDuringExecution": [{
+                "labelSelector": {
+                  "matchExpressions": [{
+                    "key": "app",
+                    "operator": "In",
+                    "values": ["rabbitmq"]
+                  }]
+                },
+                "topologyKey": "kubernetes.io/hostname"
+              }]
+            }
+          }
+    spec:
+      serviceAccountName: rabbitmq
+      terminationGracePeriodSeconds: 10
+      containers:
+        - name: rabbitmq-k8s
+          image: rabbitmq:3.7
+          volumeMounts:
+            - name: config-volume
+              mountPath: /etc/rabbitmq
+            - name: rabbitmq-data
+              mountPath: /var/lib/rabbitmq/mnesia
+          ports:
+            - name: http
+              protocol: TCP
+              containerPort: 15672
+            - name: amqp
+              protocol: TCP
+              containerPort: 5672
+          livenessProbe:
+            exec:
+              command: ["rabbitmqctl", "status"]
+            initialDelaySeconds: 60
+            periodSeconds: 10
+            timeoutSeconds: 10
+          readinessProbe:
+            exec:
+              command: ["rabbitmqctl", "status"]
+            initialDelaySeconds: 10
+            periodSeconds: 10
+            timeoutSeconds: 10
+          imagePullPolicy: Always
+          env:
+            - name: MY_POD_IP
+              valueFrom:
+                fieldRef:
+                  fieldPath: status.podIP
+            - name: HOSTNAME
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.name
+            - name: NAMESPACE
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.namespace
+            - name: RABBITMQ_USE_LONGNAME
+              value: "true"
+            - name: RABBITMQ_NODENAME
+              value: 
"rabbit@$(HOSTNAME).rabbitmq-internal.$(NAMESPACE).svc.cluster.local"
+            - name: K8S_SERVICE_NAME
+              value: "rabbitmq-internal"
+            - name: RABBITMQ_ERLANG_COOKIE
+              value: "cookie"
+      volumes:
+        - name: config-volume
+          configMap:
+            name: rabbitmq-config
+            items:
+              - key: rabbitmq.conf
+                path: rabbitmq.conf
+              - key: enabled_plugins
+                path: enabled_plugins
+        - name: rabbitmq-data
+          persistentVolumeClaim:
+            claimName: rabbitmq-data
diff --git 
a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Java_IO_IT_Tests_Dataflow.json
 
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Java_IO_IT_Tests_Dataflow.json
index 1c6cde8bbb7..733de44551c 100644
--- 
a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Java_IO_IT_Tests_Dataflow.json
+++ 
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Java_IO_IT_Tests_Dataflow.json
@@ -2717,6 +2717,129 @@
         "align": false,
         "alignLevel": null
       }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "cacheTimeout": null,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "BeamInfluxDB",
+      "description": "",
+      "fill": 1,
+      "fillGradient": 0,
+      "gridPos": {
+        "h": 9,
+        "w": 12,
+        "x": 12,
+        "y": 106
+      },
+      "hiddenSeries": false,
+      "id": 27,
+      "interval": "6h",
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 2,
+      "links": [],
+      "nullPointMode": "connected",
+      "options": {
+        "dataLinks": []
+      },
+      "percentage": false,
+      "pluginVersion": "6.7.2",
+      "pointradius": 2,
+      "points": true,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "alias": "$tag_metric",
+          "groupBy": [
+            {
+              "params": [
+                "$__interval"
+              ],
+              "type": "time"
+            }
+          ],
+          "measurement": "",
+          "orderByTime": "ASC",
+          "policy": "default",
+          "query": "SELECT mean(\"value\") FROM \"sparkreceiverioit_results\" 
WHERE \"metric\" =~ /time/ AND $timeFilter GROUP BY time($__interval), 
\"metric\"",
+          "rawQuery": true,
+          "refId": "A",
+          "resultFormat": "time_series",
+          "select": [
+            [
+              {
+                "params": [
+                  "value"
+                ],
+                "type": "field"
+              },
+              {
+                "params": [],
+                "type": "mean"
+              }
+            ]
+          ],
+          "tags": []
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeRegions": [],
+      "timeShift": null,
+      "title": "SparkReceiverIO",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": true,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "$$hashKey": "object:403",
+          "format": "s",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "$$hashKey": "object:404",
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
     }
   ],
   "refresh": false,
diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 6766db5d509..6aa2e4859c5 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -726,6 +726,7 @@ class BeamModulePlugin implements Plugin<Project> {
         testcontainers_postgresql                   : 
"org.testcontainers:postgresql:$testcontainers_version",
         testcontainers_mysql                        : 
"org.testcontainers:mysql:$testcontainers_version",
         testcontainers_gcloud                       : 
"org.testcontainers:gcloud:$testcontainers_version",
+        testcontainers_rabbitmq                     : 
"org.testcontainers:rabbitmq:$testcontainers_version",
         vendored_grpc_1_48_1                        : 
"org.apache.beam:beam-vendor-grpc-1_48_1:0.1",
         vendored_guava_26_0_jre                     : 
"org.apache.beam:beam-vendor-guava-26_0-jre:0.1",
         vendored_calcite_1_28_0                     : 
"org.apache.beam:beam-vendor-calcite-1_28_0:0.2",
diff --git a/sdks/java/io/sparkreceiver/README.md 
b/sdks/java/io/sparkreceiver/README.md
new file mode 100644
index 00000000000..6ce48efd58f
--- /dev/null
+++ b/sdks/java/io/sparkreceiver/README.md
@@ -0,0 +1,38 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+SparkReceiverIO contains I/O transforms which allow you to read messages from 
Spark Receiver (org.apache.spark.streaming.receiver.Receiver).
+
+## Dependencies
+
+To use SparkReceiverIO you must first add a dependency on 
`beam-sdks-java-io-sparkreceiver`.
+
+```maven
+<dependency>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-sparkreceiver</artifactId>
+    <version>...</version>
+</dependency>
+```
+
+## Documentation
+
+The documentation is maintained in JavaDoc for SparkReceiverIO class. It 
includes
+usage examples and primary concepts.
+- 
[SparkReceiverIO.java](src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java)
diff --git a/sdks/java/io/sparkreceiver/build.gradle 
b/sdks/java/io/sparkreceiver/build.gradle
index fb1f681d926..52c6a634049 100644
--- a/sdks/java/io/sparkreceiver/build.gradle
+++ b/sdks/java/io/sparkreceiver/build.gradle
@@ -33,9 +33,10 @@ ext.summary = """Apache Beam SDK provides a simple, 
Java-based
 interface for streaming integration with CDAP plugins."""
 
 configurations.all {
-    exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+    exclude group: 'ch.qos.logback', module: 'logback-classic'
     exclude group: 'org.slf4j', module: 'slf4j-jdk14'
-    exclude group: 'org.slf4j', module: 'slf4j-simple'
+    exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+    exclude group: 'org.slf4j', module: 'slf4j-reload4j'
 }
 
 dependencies {
@@ -48,6 +49,10 @@ dependencies {
     implementation project(path: ":sdks:java:core", configuration: "shadow")
     compileOnly "org.scala-lang:scala-library:2.11.12"
     testImplementation library.java.junit
+    testImplementation library.java.testcontainers_rabbitmq
     testImplementation project(path: ":runners:direct-java", configuration: 
"shadow")
-    testImplementation project(path: ":examples:java", configuration: 
"testRuntimeMigration")
+    testImplementation project(":sdks:java:io:synthetic")
+    testImplementation project(path: ":sdks:java:io:common", configuration: 
"testRuntimeMigration")
+    testImplementation project(path: ":sdks:java:testing:test-utils", 
configuration: "testRuntimeMigration")
+    testImplementation "com.rabbitmq:amqp-client:5.16.0"
 }
diff --git 
a/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java
 
b/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java
index 2eb46561e2c..8b2fdcb01ad 100644
--- 
a/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java
+++ 
b/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.sparkreceiver;
 
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 
+import java.math.BigDecimal;
+import java.math.MathContext;
 import java.nio.ByteBuffer;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -31,6 +33,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -60,7 +63,7 @@ class ReadFromSparkReceiverWithOffsetDoFn<V> extends 
DoFn<byte[], V> {
       LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class);
 
   /** Constant waiting time after the {@link Receiver} starts. Required to 
prepare for polling */
-  private static final int START_POLL_TIMEOUT_MS = 1000;
+  private static final int START_POLL_TIMEOUT_MS = 2000;
 
   private final SerializableFunction<Instant, WatermarkEstimator<Instant>>
       createWatermarkEstimatorFn;
@@ -108,10 +111,68 @@ class ReadFromSparkReceiverWithOffsetDoFn<V> extends 
DoFn<byte[], V> {
     return restrictionTracker(element, 
offsetRange).getProgress().getWorkRemaining();
   }
 
+  /**
+   * {@link OffsetRangeTracker} that performs basic split only in {@link
+   * OffsetRangeTracker#checkDone}. This behavior allows reading from primary 
range until resume,
+   * and then split to {alreadyReadRange, residualRange}.
+   */
+  private static class CustomOffsetRangeTracker extends OffsetRangeTracker {
+
+    public CustomOffsetRangeTracker(OffsetRange range) {
+      super(range);
+    }
+
+    @SuppressWarnings("nullness") // Base method can return null
+    @Override
+    public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+      if (lastAttemptedOffset != null) {
+        if (range.getTo() == Long.MAX_VALUE) {
+          // Do not split, just use primary range
+          return null;
+        } else {
+          // Need to add residual range
+          OffsetRange res = new OffsetRange(range.getTo(), Long.MAX_VALUE);
+          this.range = new OffsetRange(range.getFrom(), range.getTo());
+          return SplitResult.of(range, res);
+        }
+      }
+      // Basic split logic when lastAttemptedOffset is null
+
+      // Convert to BigDecimal in computation to prevent overflow, which may 
result in loss of
+      // precision.
+      BigDecimal cur =
+          BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE, 
MathContext.DECIMAL128);
+      // split = cur + max(1, (range.getTo() - cur) * fractionOfRemainder)
+      BigDecimal splitPos =
+          cur.add(
+              BigDecimal.valueOf(range.getTo())
+                  .subtract(cur, MathContext.DECIMAL128)
+                  .multiply(BigDecimal.valueOf(fractionOfRemainder), 
MathContext.DECIMAL128)
+                  .max(BigDecimal.ONE),
+              MathContext.DECIMAL128);
+
+      long split = splitPos.longValue();
+      if (split >= range.getTo()) {
+        return null;
+      }
+      OffsetRange res = new OffsetRange(split, range.getTo());
+      this.range = new OffsetRange(range.getFrom(), split);
+      return SplitResult.of(range, res);
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {
+      if (lastAttemptedOffset != null && range.getTo() == Long.MAX_VALUE) {
+        // Perform basic split
+        super.trySplit(0);
+      }
+    }
+  }
+
   @NewTracker
   public OffsetRangeTracker restrictionTracker(
       @Element byte[] element, @Restriction OffsetRange restriction) {
-    return new OffsetRangeTracker(restriction);
+    return new CustomOffsetRangeTracker(restriction);
   }
 
   @GetRestrictionCoder
@@ -223,27 +284,38 @@ class ReadFromSparkReceiverWithOffsetDoFn<V> extends 
DoFn<byte[], V> {
       LOG.error("Can not build Spark Receiver", e);
       throw new IllegalStateException("Spark Receiver was not built!");
     }
+    LOG.debug("Restriction {}", tracker.currentRestriction().toString());
     sparkConsumer = new 
SparkConsumerWithOffset<>(tracker.currentRestriction().getFrom());
     sparkConsumer.start(sparkReceiver);
 
-    while (sparkConsumer.hasRecords()) {
-      V record = sparkConsumer.poll();
-      if (record != null) {
-        Long offset = getOffsetFn.apply(record);
-        if (!tracker.tryClaim(offset)) {
-          sparkConsumer.stop();
-          LOG.debug("Stop for restriction: {}", 
tracker.currentRestriction().toString());
-          return ProcessContinuation.stop();
+    while (true) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(START_POLL_TIMEOUT_MS);
+      } catch (InterruptedException e) {
+        LOG.error("SparkReceiver was interrupted before polling started", e);
+        throw new IllegalStateException("Spark Receiver was interrupted before 
polling started");
+      }
+      if (!sparkConsumer.hasRecords()) {
+        sparkConsumer.stop();
+        tracker.checkDone();
+        LOG.debug("Resume for restriction: {}", 
tracker.currentRestriction().toString());
+        return ProcessContinuation.resume();
+      }
+      while (sparkConsumer.hasRecords()) {
+        V record = sparkConsumer.poll();
+        if (record != null) {
+          Long offset = getOffsetFn.apply(record);
+          if (!tracker.tryClaim(offset)) {
+            sparkConsumer.stop();
+            LOG.debug("Stop for restriction: {}", 
tracker.currentRestriction().toString());
+            return ProcessContinuation.stop();
+          }
+          Instant currentTimeStamp = getTimestampFn.apply(record);
+          ((ManualWatermarkEstimator<Instant>) 
watermarkEstimator).setWatermark(currentTimeStamp);
+          receiver.outputWithTimestamp(record, currentTimeStamp);
         }
-        Instant currentTimeStamp = getTimestampFn.apply(record);
-        ((ManualWatermarkEstimator<Instant>) 
watermarkEstimator).setWatermark(currentTimeStamp);
-        System.err.println(record);
-        receiver.outputWithTimestamp(record, currentTimeStamp);
       }
     }
-    sparkConsumer.stop();
-    LOG.debug("Resume for restriction: {}", 
tracker.currentRestriction().toString());
-    return ProcessContinuation.resume();
   }
 
   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
diff --git 
a/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java
 
b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java
new file mode 100644
index 00000000000..362e6280eb2
--- /dev/null
+++ 
b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Imitation of Spark {@link Receiver} for RabbitMQ that implements {@link 
HasOffset} interface.
+ * Used to test {@link SparkReceiverIO#read()}.
+ */
+class RabbitMqReceiverWithOffset extends Receiver<String> implements HasOffset 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMqReceiverWithOffset.class);
+  private static final int MAX_PREFETCH_COUNT = 65535;
+
+  private final String rabbitmqUrl;
+  private final String streamName;
+  private final long totalMessagesNumber;
+  private long startOffset;
+  private static final int READ_TIMEOUT_IN_MS = 100;
+
+  RabbitMqReceiverWithOffset(
+      final String uri, final String streamName, final long 
totalMessagesNumber) {
+    super(StorageLevel.MEMORY_AND_DISK_2());
+    rabbitmqUrl = uri;
+    this.streamName = streamName;
+    this.totalMessagesNumber = totalMessagesNumber;
+  }
+
+  @Override
+  public void setStartOffset(Long startOffset) {
+    this.startOffset = startOffset != null ? startOffset : 0;
+  }
+
+  @Override
+  public Long getEndOffset() {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public void onStart() {
+    Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().build()).submit(this::receive);
+  }
+
+  @Override
+  public void onStop() {}
+
+  private void receive() {
+    long currentOffset = startOffset;
+
+    final TestConsumer testConsumer;
+    final Connection connection;
+    final Channel channel;
+
+    try {
+      LOG.info("Starting receiver with offset {}", currentOffset);
+      final ConnectionFactory connectionFactory = new ConnectionFactory();
+      connectionFactory.setUri(rabbitmqUrl);
+      connectionFactory.setAutomaticRecoveryEnabled(true);
+      connectionFactory.setConnectionTimeout(600000);
+      connectionFactory.setNetworkRecoveryInterval(5000);
+      connectionFactory.setRequestedHeartbeat(60);
+      connectionFactory.setTopologyRecoveryEnabled(true);
+      connectionFactory.setRequestedChannelMax(0);
+      connectionFactory.setRequestedFrameMax(0);
+      connection = connectionFactory.newConnection();
+
+      channel = connection.createChannel();
+      channel.queueDeclare(
+          streamName, true, false, false, 
Collections.singletonMap("x-queue-type", "stream"));
+      channel.basicQos(Math.min(MAX_PREFETCH_COUNT, (int) 
totalMessagesNumber));
+      testConsumer = new TestConsumer(this, channel, this::store);
+
+      channel.basicConsume(
+          streamName,
+          false,
+          Collections.singletonMap("x-stream-offset", currentOffset),
+          testConsumer);
+    } catch (Exception e) {
+      LOG.error("Can not basic consume", e);
+      throw new RuntimeException(e);
+    }
+
+    while (!isStopped()) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(READ_TIMEOUT_IN_MS);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted", e);
+      }
+    }
+
+    try {
+      LOG.info("Stopping receiver");
+      channel.close();
+      connection.close();
+    } catch (TimeoutException | IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** A simple RabbitMQ {@code Consumer}. */
+  static class TestConsumer extends DefaultConsumer {
+
+    private final java.util.function.Consumer<String> messageConsumer;
+    private final Receiver<String> receiver;
+
+    public TestConsumer(
+        Receiver<String> receiver,
+        Channel channel,
+        java.util.function.Consumer<String> messageConsumer) {
+      super(channel);
+      this.receiver = receiver;
+      this.messageConsumer = messageConsumer;
+    }
+
+    @Override
+    public void handleDelivery(
+        String consumerTag, Envelope envelope, AMQP.BasicProperties 
properties, byte[] body) {
+      try {
+        final String sMessage = new String(body, StandardCharsets.UTF_8);
+        LOG.trace("Adding message to consumer: {}", sMessage);
+        messageConsumer.accept(sMessage);
+        if (getChannel().isOpen() && !receiver.isStopped()) {
+          getChannel().basicAck(envelope.getDeliveryTag(), false);
+        }
+      } catch (Exception e) {
+        LOG.error("Can't read from RabbitMQ: {}", e.getMessage());
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
 
b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
new file mode 100644
index 00000000000..67b4e2cabba
--- /dev/null
+++ 
b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Test class for {@link ReadFromSparkReceiverWithOffsetDoFn}. */
+public class ReadFromSparkReceiverWithOffsetDoFnTest {
+
+  private static final byte[] TEST_ELEMENT = new byte[] {};
+
+  private final ReadFromSparkReceiverWithOffsetDoFn<String> dofnInstance =
+      new ReadFromSparkReceiverWithOffsetDoFn<>(makeReadTransform());
+
+  private SparkReceiverIO.Read<String> makeReadTransform() {
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new 
ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    return SparkReceiverIO.<String>read()
+        .withSparkReceiverBuilder(receiverBuilder)
+        .withGetOffsetFn(Long::valueOf)
+        .withTimestampFn(Instant::parse);
+  }
+
+  private static class MockOutputReceiver implements 
DoFn.OutputReceiver<String> {
+
+    private final List<String> records = new ArrayList<>();
+
+    @Override
+    public void output(String output) {}
+
+    @Override
+    public void outputWithTimestamp(
+        String output, @UnknownKeyFor @NonNull @Initialized Instant timestamp) 
{
+      records.add(output);
+    }
+
+    public List<String> getOutputs() {
+      return this.records;
+    }
+  }
+
+  private final ManualWatermarkEstimator<Instant> mockWatermarkEstimator =
+      new ManualWatermarkEstimator<Instant>() {
+
+        @Override
+        public void setWatermark(Instant watermark) {
+          // do nothing
+        }
+
+        @Override
+        public Instant currentWatermark() {
+          return null;
+        }
+
+        @Override
+        public Instant getState() {
+          return null;
+        }
+      };
+
+  private List<String> createExpectedRecords(int numRecords) {
+    List<String> records = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      records.add(String.valueOf(i));
+    }
+    return records;
+  }
+
+  @Test
+  public void testInitialRestriction() {
+    long expectedStartOffset = 0L;
+    OffsetRange result = dofnInstance.initialRestriction(TEST_ELEMENT);
+    assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result);
+  }
+
+  @Test
+  public void testRestrictionTrackerSplit() {
+    OffsetRangeTracker offsetRangeTracker =
+        dofnInstance.restrictionTracker(
+            TEST_ELEMENT, dofnInstance.initialRestriction(TEST_ELEMENT));
+    assertEquals(0L, offsetRangeTracker.currentRestriction().getFrom());
+    assertEquals(Long.MAX_VALUE, 
offsetRangeTracker.currentRestriction().getTo());
+
+    assertEquals(
+        SplitResult.of(new OffsetRange(0, 0), new OffsetRange(0, 
Long.MAX_VALUE)),
+        offsetRangeTracker.trySplit(0d));
+
+    offsetRangeTracker =
+        dofnInstance.restrictionTracker(
+            TEST_ELEMENT, dofnInstance.initialRestriction(TEST_ELEMENT));
+
+    assertTrue(offsetRangeTracker.tryClaim(0L));
+    assertNull(offsetRangeTracker.trySplit(0d));
+
+    offsetRangeTracker.checkDone();
+    assertEquals(
+        SplitResult.of(new OffsetRange(0, 1), new OffsetRange(1, 
Long.MAX_VALUE)),
+        offsetRangeTracker.trySplit(0d));
+  }
+
+  @Test
+  public void testProcessElement() {
+    MockOutputReceiver receiver = new MockOutputReceiver();
+    DoFn.ProcessContinuation result =
+        dofnInstance.processElement(
+            TEST_ELEMENT,
+            dofnInstance.restrictionTracker(
+                TEST_ELEMENT, dofnInstance.initialRestriction(TEST_ELEMENT)),
+            mockWatermarkEstimator,
+            receiver);
+    assertEquals(DoFn.ProcessContinuation.resume(), result);
+    assertEquals(
+        createExpectedRecords(CustomReceiverWithOffset.RECORDS_COUNT), 
receiver.getOutputs());
+  }
+}
diff --git 
a/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java
 
b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java
new file mode 100644
index 00000000000..b335aab2ed5
--- /dev/null
+++ 
b/sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link 
org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see 
https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests}
 for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all 
messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = 
"spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = 
PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = 
TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), 
SyntheticSourceOptions.class);
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                
DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), 
rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": 
\"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(false)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is 
true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read 
pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, 
KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, 
arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                
Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean 
them up after test
+   * execution. The simplest way is to delete the whole stream after test 
execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State 
readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, 
SparkReceiverIOIT.NAMESPACE);
+    return 
metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - 
start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), 
READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, 
readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws 
IOException {
+
+    final List<String> messages =
+        LongStream.range(0, sourceOptions.numRecords)
+            .mapToObj(number -> TEST_MESSAGE_PREFIX + number)
+            .collect(Collectors.toList());
+
+    try {
+      writeToRabbitMq(messages);
+    } catch (Exception e) {
+      LOG.error("Can not write to rabbit {}", e.getMessage());
+      fail();
+    }
+    LOG.info(sourceOptions.numRecords + " records were successfully written to 
RabbitMQ");
+
+    // Use streaming pipeline to read RabbitMQ records.
+    readPipeline.getOptions().as(Options.class).setStreaming(true);
+    readPipeline
+        .apply("Read from unbounded RabbitMq", readFromRabbitMqWithOffset())
+        .setCoder(StringUtf8Coder.of())
+        .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, 
READ_TIME_METRIC_NAME)))
+        .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, 
READ_ELEMENT_METRIC_NAME)));
+
+    final PipelineResult readResult = readPipeline.run();
+    final PipelineResult.State readState =
+        
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    cancelIfTimeout(readResult, readState);
+
+    assertEquals(sourceOptions.numRecords, readElementMetric(readResult));
+
+    if (!options.isWithTestcontainers()) {
+      Set<NamedTestResult> metrics = readMetrics(readResult);
+      IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
+    }
+  }
+}

Reply via email to