This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-service-registry-managed-example in repository https://gitbox.apache.org/repos/asf/camel-quarkus-examples.git
commit 47e8eac73f74e312df78cae5637cbec875dbbcc4 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Oct 19 10:01:20 2021 +0200 Added a Kafka Example based on Managed Services for Kafka and Service Registry --- kafka-service-registry/README.md | 41 ++ .../kafka-registry-consumer/README.adoc | 228 +++++++++++ .../kubefiles/secret-example.yml | 28 ++ .../kafka-registry-consumer/pom.xml | 244 ++++++++++++ .../src/main/docker/Dockerfile.jvm | 71 ++++ .../src/main/docker/Dockerfile.native | 44 +++ .../src/main/java/org/acme/kafka/Routes.java | 35 ++ .../src/main/resources/application.properties | 39 ++ .../kafka-registry-producer/README.adoc | 10 + .../kafka-registry-producer/pom.xml | 234 +++++++++++ .../src/main/docker/Dockerfile.jvm | 71 ++++ .../src/main/docker/Dockerfile.native | 44 +++ .../src/main/java/org/acme/kafka/Routes.java | 37 ++ .../src/main/java/org/acme/kafka/User.java | 435 +++++++++++++++++++++ .../src/main/resources/application.properties | 40 ++ .../kafka-registry-producer/user.avsc | 15 + kafka-service-registry/pom.xml | 20 + 17 files changed, 1636 insertions(+) diff --git a/kafka-service-registry/README.md b/kafka-service-registry/README.md new file mode 100644 index 0000000..9a72f37 --- /dev/null +++ b/kafka-service-registry/README.md @@ -0,0 +1,41 @@ += Managed Kafka and Service Registry Example with Camel-Qaurkus + +:cq-example-description: An example that shows how to produce and consume messages in a Kafka topic, using Managed Kafka and Managed Service Registry from Red Hat Cloud. + +{cq-description} + +TIP: Check the https://camel.apache.org/camel-quarkus/latest/first-steps.html[Camel Quarkus User guide] for prerequisites +and other general information. + +== Prerequisites + +1. Create Kafka Managed Service instance on cloud.redhat.com + +2. Create associated Service Account, save client Id and Client Secret + +3. Create Service Registry Managed instance on cloud.redhat.com + +4. Populate correctly the producer application.properties file with the missing parameters + +5. Populate correctly the consumer application.properties file with the missing parameters + +6. From the Service Registry Managed Instance UI load the user.avsc as schema named 'test-value' with no group + +Notes: +- The class User has been generated starting from the avsc user schema, through the avro tools + +== Run Example + +1. From the producer folder run + + mvn clean compile package + java -jar target/quarkus-app/quarkus-run.jar + +2. From the consumer folder run + + mvn clean compile package + java -Dquarkus.http.port=8081 -jar target/quarkus-app/quarkus-run.jar + +== Feedback + +Please report bugs and propose improvements via https://github.com/apache/camel-quarkus/issues[GitHub issues of Camel Quarkus] project. diff --git a/kafka-service-registry/kafka-registry-consumer/README.adoc b/kafka-service-registry/kafka-registry-consumer/README.adoc new file mode 100644 index 0000000..9476221 --- /dev/null +++ b/kafka-service-registry/kafka-registry-consumer/README.adoc @@ -0,0 +1,228 @@ += Kafka example : A Camel Quarkus example +:cq-example-description: An example that shows how to produce and consume messages in a Kafka topic, using Strimzi Operator + +{cq-description} + +TIP: Check the https://camel.apache.org/camel-quarkus/latest/first-steps.html[Camel Quarkus User guide] for prerequisites +and other general information. + + +== Prerequisites + +The example application requires a Kafka instance. + +You do not need to provide the Kafka instance yourself +as long as you play with the example code in dev mode (a.k.a. `mvn quarkus:dev` - read more [here](https://quarkus.io/guides/getting-started#development-mode)) +or as long as you only run the supplied tests (`mvn test`). +In those situations, Quarkus tooling starts a Redpanda image for you via [Quarkus Dev Services](https://quarkus.io/guides/kafka-dev-services) +and it also configures the application so that you do not need touch anything in `application.properties`. + +== Start in Development mode + +Run the application in development mode. + +TIP: If you want to use another running instance, in dev mode. Uncomment the corresponding Kafka configuration section in `src/main/resources/application.properties` and change `%prod` profile to `%dev`. + +[source,shell] +---- +$ mvn clean compile quarkus:dev +---- + +The above command compiles the project, starts the application and lets the Quarkus tooling watch for changes in your +workspace. Any modifications in your project will automatically take effect in the running application. + +TIP: Please refer to the Development mode section of +https://camel.apache.org/camel-quarkus/latest/first-steps.html#_development_mode[Camel Quarkus User guide] for more details. + +You should start to see some log messages appearing on the console. + +Every 10 seconds the timer component triggers the generation of random Message and send it to the Kafka topic `Test`. + +[source,shell] +---- +[FromTimer2Kafka] (Camel (camel-1) thread #2 - KafkaProducer[test]) Message sent correctly sent to the topic! : "Message #1" +---- + +Next a Kafka consumer reads the messages and put them in a seda queue. + +[source,shell] +---- +[FromKafka2Seda] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Received : "Message #1" +---- + +Next pull a message from the queue : +[source,shell] +---- +$ curl -X GET http://0.0.0.0:8080/example +---- + + +=== Configure Kafka client, package and run the application + +Once you are done with developing you may want to configure your kafka client, package and run the application. + +TIP: Find more details about the JVM mode and Native mode in the Package and run section of +https://camel.apache.org/camel-quarkus/latest/first-steps.html#_package_and_run_the_application[Camel Quarkus User guide] + +==== Configure kafka client +Uncomment the corresponding commented section in `src/main/resources/application.properties`. + +- The section Kafka instance without Authentication if no Authentication required. +- The section Kafka instance with SASL Plain if using SASL. +- The section Kafka instance with SASL Oauth Bearer if using Oauth Bearer. + +You need to set the corresponding environment variables: +- Without Authentication +[source,shell] +---- +$ export brokers=<YOUR_KAFKA_BROKERS_URL> +---- +- SASL Plain +[source,shell] +---- +$ export brokers=<YOUR_KAFKA_BROKERS_URL> +$ export id=<YOUR_KAFKA_SASL_CLIENT_ID> +$ export secret=<YOUR_KAFKA_SASL_CLIENT_SECRET> +---- +-SASL Oauth Bearer +[source,shell] +---- +$ export brokers=<YOUR_KAFKA_BROKERS_URL> +$ export id=<YOUR_KAFKA_SASL_CLIENT_ID> +$ export secret=<YOUR_KAFKA_SASL_CLIENT_SECRET> +$ export token=<YOUR_KAFKA_SASL_OAUTHBEARER_TOKEN_URL> +---- + +If you want to deploy on Kubernetes or Openshift, you'd need to define those in a secret named `camel-kafka`. Set the needed values in the `kubefiles/secret-example.yml`, then add the secret : +[source,shell] +---- +$ kubectl apply -f kubefiles/secret-example.yml +---- + +==== JVM mode + +[source,shell] +---- +$ mvn clean package -DskipTests +$ java -jar target/quarkus-app/quarkus-run.jar +---- + +==== Native mode + +IMPORTANT: Native mode requires having GraalVM and other tools installed. Please check the Prerequisites section +of https://camel.apache.org/camel-quarkus/latest/first-steps.html#_prerequisites[Camel Quarkus User guide]. + +To prepare a native executable using GraalVM, run the following command: + +[source,shell] +---- +$ mvn clean package -DskipTests -Pnative +$ ./target/*-runner +---- + +==== Deploying to Kubernetes + +You can build a container image for the application like this. Refer to the https://quarkus.io/guides/deploying-to-kubernetes[Quarkus Kubernetes guide] for options around customizing image names, registries etc. + +This example uses Jib to create the container image for Kubernetes deployment. + +Uncomment the creating container with jib and secrets, in the Kubernetes specific section in `src/main/resources/application.properties`. Set image group and image registry. + +Build the application using the `kubernetes` profile. + +[source,shell] +---- +$ mvn clean package -DskipTests -Dkubernetes +---- + +The `kubernetes` profile uses quarkus kubernetes and jib container extensions, as described in the `pom.xml`. + +[source,shell] +---- +<dependencies> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-kubernetes</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-container-image-jib</artifactId> + </dependency> +</dependencies> +---- + +If you are using a local development cluster like Kind or k3s, you can use host the container image on your local host. Or, with minikube, use the Docker daemon from the cluster virtual machine `eval $(minikube docker-env)`. Otherwise, you'll need to push the image to a registry of your choosing. + +TIP: You can build & deploy in one single step by doing `mvn clean package -DskipTests -Dkubernetes -Dquarkus.kubernetes.deploy=true` + +Check that the pods are running. + +Example when using Strimzi operator, with a Kafka instance named `Test` : + +[source,shell] +---- +$ kubectl get pods +NAME READY STATUS RESTARTS AGE +camel-quarkus-examples-kafka-dbc56974b-ph29m 1/1 Running 0 2m34s +test-entity-operator-7cccff5899-dlfx8 3/3 Running 0 48m +test-kafka-0 1/1 Running 0 49m +test-kafka-1 1/1 Running 0 49m +test-kafka-2 1/1 Running 0 49m +test-zookeeper-0 1/1 Running 0 50m +test-zookeeper-1 1/1 Running 0 50m +test-zookeeper-2 1/1 Running 0 50m + +---- + +Tail the application logs. + +[source,shell] +---- +$ kubectl logs -f camel-quarkus-examples-kafka-dbc56974b-ph29m +---- + +To clean up do. + +[source,shell] +---- +$ kubectl delete all -l app.kubernetes.io/name=camel-quarkus-examples-kafka +$ kubectl delete secret camel-kafka +---- + +[NOTE] +==== +If you need to configure container resource limits & requests, or enable the Quarkus Kubernetes client to trust self signed certificates, you can find these configuration options in `src/main/resources/application.properties`. Simply uncomment them and set your desired values. +==== + + +==== Deploying to OpenShift + +Uncomment the creating container with openshift and secrets, in the Openshift specific section in `src/main/resources/application.properties`. + + +[source,shell] +---- +$ mvn clean package -DskipTests -Dquarkus.kubernetes.deploy=true -Dopenshift +---- + +The `openshift` profile uses quarkus openshift and openshift-container extensions, as described in the `pom.xml`. + +[source,shell] +---- +<dependencies> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-openshift</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-container-image-openshift</artifactId> + </dependency> +</dependencies> +---- + +You can check the pod status and tail logs using the commands mentioned above in the Kubernetes section. Use the `oc` binary instead of `kubectl` if preferred. + +== Feedback + +Please report bugs and propose improvements via https://github.com/apache/camel-quarkus/issues[GitHub issues of Camel Quarkus] project. diff --git a/kafka-service-registry/kafka-registry-consumer/kubefiles/secret-example.yml b/kafka-service-registry/kafka-registry-consumer/kubefiles/secret-example.yml new file mode 100644 index 0000000..72499b2 --- /dev/null +++ b/kafka-service-registry/kafka-registry-consumer/kubefiles/secret-example.yml @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: v1 +kind: Secret +metadata: + name: camel-kafka + namespace: test +type: Opaque +stringData: + brokers: "<YOUR_KAFKA_BROKERS_URL>" + id: "<YOUR_KAFKA_SASL_CLIENT_ID>" + secret: "<YOUR_KAFKA_SASL_CLIENT_SECRET>" + token: "<YOUR_KAFKA_SASL_OAUTHBEARER_TOKEN_URL>" \ No newline at end of file diff --git a/kafka-service-registry/kafka-registry-consumer/pom.xml b/kafka-service-registry/kafka-registry-consumer/pom.xml new file mode 100644 index 0000000..fc34ad9 --- /dev/null +++ b/kafka-service-registry/kafka-registry-consumer/pom.xml @@ -0,0 +1,244 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>camel-quarkus-examples-kafka-service-registry-example</artifactId> + <groupId>org.apache.camel.quarkus.examples</groupId> + <version>2.3.0</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>camel-quarkus-examples-kafka-service-registry-consumer</artifactId> + + <name>Camel Quarkus :: Examples :: Kafka Service Registry Consumer</name> + <description>Camel Quarkus Example :: Kafka Service Registry Consumer</description> + + <properties> + <quarkus.platform.version>2.3.0.Final</quarkus.platform.version> + <camel-quarkus.platform.version>${quarkus.platform.version}</camel-quarkus.platform.version> + + <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id> + <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id> + <camel-quarkus.platform.group-id>${quarkus.platform.group-id}</camel-quarkus.platform.group-id> + <camel-quarkus.platform.artifact-id>quarkus-camel-bom</camel-quarkus.platform.artifact-id> + + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <maven.compiler.target>11</maven.compiler.target> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.testTarget>${maven.compiler.target}</maven.compiler.testTarget> + <maven.compiler.testSource>${maven.compiler.source}</maven.compiler.testSource> + + <formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version> + <impsort-maven-plugin.version>1.3.2</impsort-maven-plugin.version> + <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version> + <maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> + <maven-resources-plugin.version>3.1.0</maven-resources-plugin.version> + <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> + <mycila-license.version>3.0</mycila-license.version> + <kafka-oauth-client.version>0.7.2</kafka-oauth-client.version> + <apicurio-registry.version>2.1.0.Final</apicurio-registry.version> + </properties> + + <dependencyManagement> + <dependencies> + <!-- Import BOM --> + <dependency> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>${quarkus.platform.artifact-id}</artifactId> + <version>${quarkus.platform.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>${camel-quarkus.platform.group-id}</groupId> + <artifactId>${camel-quarkus.platform.artifact-id}</artifactId> + <version>${camel-quarkus.platform.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>io.strimzi</groupId> + <artifactId>kafka-oauth-client</artifactId> + <version>${kafka-oauth-client.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-log</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-timer</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-seda</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-jackson</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-kubernetes-config</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-apicurio-registry-avro</artifactId> + </dependency> + <dependency> + <groupId>io.apicurio</groupId> + <artifactId>apicurio-registry-serdes-avro-serde</artifactId> + <version>${apicurio-registry.version}</version> + </dependency> + + <!-- For oauth use case --> + <dependency> + <groupId>io.strimzi</groupId> + <artifactId>kafka-oauth-client</artifactId> + </dependency> + + <!-- Test --> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <pluginManagement> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin.version}</version> + <configuration> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + <compilerArgs> + <arg>-Xlint:unchecked</arg> + </compilerArgs> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <failIfNoTests>false</failIfNoTests> + <systemProperties> + <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager> + </systemProperties> + </configuration> + </plugin> + + <plugin> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <version>${quarkus.platform.version}</version> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>${maven-jar-plugin.version}</version> + </plugin> + + <plugin> + <groupId>com.mycila</groupId> + <artifactId>license-maven-plugin</artifactId> + <version>${mycila-license.version}</version> + <configuration> + <failIfUnknown>true</failIfUnknown> + <header>${maven.multiModuleProjectDirectory}/header.txt</header> + <excludes> + <exclude>**/*.adoc</exclude> + <exclude>**/*.txt</exclude> + <exclude>**/LICENSE.txt</exclude> + <exclude>**/LICENSE</exclude> + <exclude>**/NOTICE.txt</exclude> + <exclude>**/NOTICE</exclude> + <exclude>**/README</exclude> + <exclude>**/pom.xml.versionsBackup</exclude> + </excludes> + <mapping> + <java>SLASHSTAR_STYLE</java> + <properties>CAMEL_PROPERTIES_STYLE</properties> + <Dockerfile.jvm>CAMEL_PROPERTIES_STYLE</Dockerfile.jvm> + <Dockerfile.native>CAMEL_PROPERTIES_STYLE</Dockerfile.native> + </mapping> + <headerDefinitions> + <headerDefinition>${maven.multiModuleProjectDirectory}/license-properties-headerdefinition.xml</headerDefinition> + </headerDefinitions> + </configuration> + </plugin> + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <executions> + <execution> + <id>build</id> + <goals> + <goal>build</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/kafka-service-registry/kafka-registry-consumer/src/main/docker/Dockerfile.jvm b/kafka-service-registry/kafka-registry-consumer/src/main/docker/Dockerfile.jvm new file mode 100644 index 0000000..1e65c99 --- /dev/null +++ b/kafka-service-registry/kafka-registry-consumer/src/main/docker/Dockerfile.jvm @@ -0,0 +1,71 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode +# +# Before building the container image run: +# +# ./mvnw package +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/camel-quarkus-examples-kafka-jvm . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/camel-quarkus-examples-kafka-jvm +# +# If you want to include the debug port into your docker image +# you will have to expose the debug port (default 5005) like this : EXPOSE 8080 5005 +# +# Then run the container using : +# +# docker run -i --rm -p 8080:8080 -p 5005:5005 -e JAVA_ENABLE_DEBUG="true" quarkus/camel-quarkus-examples-kafka-jvm +# +### +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 + +ARG JAVA_PACKAGE=java-11-openjdk-headless +ARG RUN_JAVA_VERSION=1.3.8 +ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' +# Install java and the run-java script +# Also set up permissions for user `1001` +RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \ + && microdnf update \ + && microdnf clean all \ + && mkdir /deployments \ + && chown 1001 /deployments \ + && chmod "g+rwX" /deployments \ + && chown 1001:root /deployments \ + && curl https://repo1.maven.org/maven2/io/fabric8/run-java-sh/${RUN_JAVA_VERSION}/run-java-sh-${RUN_JAVA_VERSION}-sh.sh -o /deployments/run-java.sh \ + && chown 1001 /deployments/run-java.sh \ + && chmod 540 /deployments/run-java.sh \ + && echo "securerandom.source=file:/dev/urandom" >> /etc/alternatives/jre/conf/security/java.security + +# Configure the JAVA_OPTIONS, you can add -XshowSettings:vm to also display the heap size. +ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" +# We make four distinct layers so if there are application changes the library layers can be re-used +COPY --chown=1001 target/quarkus-app/lib/ /deployments/lib/ +COPY --chown=1001 target/quarkus-app/*.jar /deployments/ +COPY --chown=1001 target/quarkus-app/app/ /deployments/app/ +COPY --chown=1001 target/quarkus-app/quarkus/ /deployments/quarkus/ + +EXPOSE 8080 +USER 1001 + +ENTRYPOINT [ "/deployments/run-java.sh" ] \ No newline at end of file diff --git a/kafka-service-registry/kafka-registry-consumer/src/main/docker/Dockerfile.native b/kafka-service-registry/kafka-registry-consumer/src/main/docker/Dockerfile.native new file mode 100644 index 0000000..04038f5 --- /dev/null +++ b/kafka-service-registry/kafka-registry-consumer/src/main/docker/Dockerfile.native @@ -0,0 +1,44 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode +# +# Before building the container image run: +# +# ./mvnw package -Pnative +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.native -t quarkus/camel-quarkus-examples-kafka . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/camel-quarkus-examples-kafka +# +### +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 +WORKDIR /work/ +RUN chown 1001 /work \ + && chmod "g+rwX" /work \ + && chown 1001:root /work +COPY --chown=1001:root target/*-runner /work/application + +EXPOSE 8080 +USER 1001 + +CMD ["./application", "-Dquarkus.http.host=0.0.0.0"] \ No newline at end of file diff --git a/kafka-service-registry/kafka-registry-consumer/src/main/java/org/acme/kafka/Routes.java b/kafka-service-registry/kafka-registry-consumer/src/main/java/org/acme/kafka/Routes.java new file mode 100644 index 0000000..d97a31d --- /dev/null +++ b/kafka-service-registry/kafka-registry-consumer/src/main/java/org/acme/kafka/Routes.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.kafka; + +import javax.enterprise.context.ApplicationScoped; + +import org.apache.camel.builder.RouteBuilder; + +@ApplicationScoped +public class Routes extends RouteBuilder { + + @Override + public void configure() throws Exception { + + // kafka consumer + from("kafka:{{kafka.topic.name}}") + .routeId("FromKafka2Seda") + .log("Received : \"${body}\"") + .to("seda:kafka-messages"); + } +} diff --git a/kafka-service-registry/kafka-registry-consumer/src/main/resources/application.properties b/kafka-service-registry/kafka-registry-consumer/src/main/resources/application.properties new file mode 100644 index 0000000..ff595ba --- /dev/null +++ b/kafka-service-registry/kafka-registry-consumer/src/main/resources/application.properties @@ -0,0 +1,39 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +#Kafka topic Name +kafka.topic.name=test + +# How often should the messages be generated and pushed to Kafka Topic +timer.period = 10000 +timer.delay = 10000 + +# uncomment to set Kafka instance with SASL Oauth Bearer +camel.component.kafka.brokers = <brokers> +camel.component.kafka.security-protocol = SASL_SSL +camel.component.kafka.sasl-mechanism = OAUTHBEARER +camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + oauth.client.id="<client_id>" \ + oauth.client.secret="<client_secret>" \ + oauth.token.endpoint.uri="<token_endpoint_url>" ; +camel.component.kafka.additional-properties[sasl.login.callback.handler.class] = io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler +camel.component.kafka.additional-properties[apicurio.registry.url] = <registry_url> +camel.component.kafka.additional-properties[apicurio.auth.client.id] = <client_id> +camel.component.kafka.additional-properties[apicurio.auth.client.secret] = <client_secret> +camel.component.kafka.additional-properties[apicurio.auth.service.token.endpoint] = <token_endpoint_url> +camel.component.kafka.additional-properties[apicurio.registry.use-specific-avro-reader] = true +camel.component.kafka.valueDeserializer = io.apicurio.registry.serde.avro.AvroKafkaDeserializer diff --git a/kafka-service-registry/kafka-registry-producer/README.adoc b/kafka-service-registry/kafka-registry-producer/README.adoc new file mode 100644 index 0000000..5db5c90 --- /dev/null +++ b/kafka-service-registry/kafka-registry-producer/README.adoc @@ -0,0 +1,10 @@ +== Camel Quarkus Kafka with Service Registry + +1. Populate correctly the credentials in application.properties after creating kafka instance and registry instance +2. mvn clean package +3. java -jar target/quarkus-app/quarkus-run.jar + +Error: + +2021-10-11 16:50:21,197 WARN [org.apa.cam.com.tim.TimerConsumer] (Camel (camel-1) thread #0 - timer://foo) Error processing exchange. Exchange[57AD50F0C45CD89-0000000000000000]. Caused by: [io.apicurio.registry.rest.client.exception.ArtifactNotFoundException - No artifact with ID 'test-value' in group 'null' was found.]: io.apicurio.registry.rest.client.exception.ArtifactNotFoundException: No artifact with ID 'test-value' in group 'null' was found. + diff --git a/kafka-service-registry/kafka-registry-producer/pom.xml b/kafka-service-registry/kafka-registry-producer/pom.xml new file mode 100644 index 0000000..a9c4839 --- /dev/null +++ b/kafka-service-registry/kafka-registry-producer/pom.xml @@ -0,0 +1,234 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>camel-quarkus-examples-kafka-service-registry-example</artifactId> + <groupId>org.apache.camel.quarkus.examples</groupId> + <version>2.3.0</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>camel-quarkus-examples-kafka-service-registry-producer</artifactId> + + <name>Camel Quarkus :: Examples :: Kafka Service Registry Producer</name> + <description>Camel Quarkus Example :: Kafka Service Registry Producer</description> + + <properties> + <quarkus.platform.version>2.3.0.Final</quarkus.platform.version> + <camel-quarkus.platform.version>${quarkus.platform.version}</camel-quarkus.platform.version> + + <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id> + <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id> + <camel-quarkus.platform.group-id>${quarkus.platform.group-id}</camel-quarkus.platform.group-id> + <camel-quarkus.platform.artifact-id>quarkus-camel-bom</camel-quarkus.platform.artifact-id> + + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <maven.compiler.target>11</maven.compiler.target> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.testTarget>${maven.compiler.target}</maven.compiler.testTarget> + <maven.compiler.testSource>${maven.compiler.source}</maven.compiler.testSource> + + <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version> + <maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> + <maven-resources-plugin.version>3.1.0</maven-resources-plugin.version> + <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> + <mycila-license.version>3.0</mycila-license.version> + <kafka-oauth-client.version>0.7.2</kafka-oauth-client.version> + <apicurio-registry.version>2.1.0.Final</apicurio-registry.version> + </properties> + + <dependencyManagement> + <dependencies> + <!-- Import BOM --> + <dependency> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>${quarkus.platform.artifact-id}</artifactId> + <version>${quarkus.platform.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>${camel-quarkus.platform.group-id}</groupId> + <artifactId>${camel-quarkus.platform.artifact-id}</artifactId> + <version>${camel-quarkus.platform.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>io.strimzi</groupId> + <artifactId>kafka-oauth-client</artifactId> + <version>${kafka-oauth-client.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-microprofile-health</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-log</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-timer</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-seda</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-jackson</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-kubernetes-config</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-apicurio-registry-avro</artifactId> + </dependency> + <dependency> + <groupId>io.apicurio</groupId> + <artifactId>apicurio-registry-serdes-avro-serde</artifactId> + <version>${apicurio-registry.version}</version> + </dependency> + + <!-- For oauth use case --> + <dependency> + <groupId>io.strimzi</groupId> + <artifactId>kafka-oauth-client</artifactId> + </dependency> + + <!-- Test --> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <pluginManagement> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin.version}</version> + <configuration> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + <compilerArgs> + <arg>-Xlint:unchecked</arg> + </compilerArgs> + </configuration> + </plugin> + + <plugin> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <version>${quarkus.platform.version}</version> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>${maven-jar-plugin.version}</version> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + </plugin> + + <plugin> + <groupId>com.mycila</groupId> + <artifactId>license-maven-plugin</artifactId> + <version>${mycila-license.version}</version> + <configuration> + <failIfUnknown>true</failIfUnknown> + <header>${maven.multiModuleProjectDirectory}/header.txt</header> + <excludes> + <exclude>**/*.adoc</exclude> + <exclude>**/*.txt</exclude> + <exclude>**/LICENSE.txt</exclude> + <exclude>**/LICENSE</exclude> + <exclude>**/NOTICE.txt</exclude> + <exclude>**/NOTICE</exclude> + <exclude>**/README</exclude> + <exclude>**/pom.xml.versionsBackup</exclude> + </excludes> + <mapping> + <java>SLASHSTAR_STYLE</java> + <properties>CAMEL_PROPERTIES_STYLE</properties> + <Dockerfile.jvm>CAMEL_PROPERTIES_STYLE</Dockerfile.jvm> + <Dockerfile.native>CAMEL_PROPERTIES_STYLE</Dockerfile.native> + </mapping> + <headerDefinitions> + <headerDefinition>${maven.multiModuleProjectDirectory}/license-properties-headerdefinition.xml</headerDefinition> + </headerDefinitions> + </configuration> + </plugin> + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <executions> + <execution> + <id>build</id> + <goals> + <goal>build</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/kafka-service-registry/kafka-registry-producer/src/main/docker/Dockerfile.jvm b/kafka-service-registry/kafka-registry-producer/src/main/docker/Dockerfile.jvm new file mode 100644 index 0000000..1e65c99 --- /dev/null +++ b/kafka-service-registry/kafka-registry-producer/src/main/docker/Dockerfile.jvm @@ -0,0 +1,71 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode +# +# Before building the container image run: +# +# ./mvnw package +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/camel-quarkus-examples-kafka-jvm . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/camel-quarkus-examples-kafka-jvm +# +# If you want to include the debug port into your docker image +# you will have to expose the debug port (default 5005) like this : EXPOSE 8080 5005 +# +# Then run the container using : +# +# docker run -i --rm -p 8080:8080 -p 5005:5005 -e JAVA_ENABLE_DEBUG="true" quarkus/camel-quarkus-examples-kafka-jvm +# +### +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 + +ARG JAVA_PACKAGE=java-11-openjdk-headless +ARG RUN_JAVA_VERSION=1.3.8 +ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' +# Install java and the run-java script +# Also set up permissions for user `1001` +RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \ + && microdnf update \ + && microdnf clean all \ + && mkdir /deployments \ + && chown 1001 /deployments \ + && chmod "g+rwX" /deployments \ + && chown 1001:root /deployments \ + && curl https://repo1.maven.org/maven2/io/fabric8/run-java-sh/${RUN_JAVA_VERSION}/run-java-sh-${RUN_JAVA_VERSION}-sh.sh -o /deployments/run-java.sh \ + && chown 1001 /deployments/run-java.sh \ + && chmod 540 /deployments/run-java.sh \ + && echo "securerandom.source=file:/dev/urandom" >> /etc/alternatives/jre/conf/security/java.security + +# Configure the JAVA_OPTIONS, you can add -XshowSettings:vm to also display the heap size. +ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" +# We make four distinct layers so if there are application changes the library layers can be re-used +COPY --chown=1001 target/quarkus-app/lib/ /deployments/lib/ +COPY --chown=1001 target/quarkus-app/*.jar /deployments/ +COPY --chown=1001 target/quarkus-app/app/ /deployments/app/ +COPY --chown=1001 target/quarkus-app/quarkus/ /deployments/quarkus/ + +EXPOSE 8080 +USER 1001 + +ENTRYPOINT [ "/deployments/run-java.sh" ] \ No newline at end of file diff --git a/kafka-service-registry/kafka-registry-producer/src/main/docker/Dockerfile.native b/kafka-service-registry/kafka-registry-producer/src/main/docker/Dockerfile.native new file mode 100644 index 0000000..04038f5 --- /dev/null +++ b/kafka-service-registry/kafka-registry-producer/src/main/docker/Dockerfile.native @@ -0,0 +1,44 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode +# +# Before building the container image run: +# +# ./mvnw package -Pnative +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.native -t quarkus/camel-quarkus-examples-kafka . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/camel-quarkus-examples-kafka +# +### +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 +WORKDIR /work/ +RUN chown 1001 /work \ + && chmod "g+rwX" /work \ + && chown 1001:root /work +COPY --chown=1001:root target/*-runner /work/application + +EXPOSE 8080 +USER 1001 + +CMD ["./application", "-Dquarkus.http.host=0.0.0.0"] \ No newline at end of file diff --git a/kafka-service-registry/kafka-registry-producer/src/main/java/org/acme/kafka/Routes.java b/kafka-service-registry/kafka-registry-producer/src/main/java/org/acme/kafka/Routes.java new file mode 100644 index 0000000..cdd2aeb --- /dev/null +++ b/kafka-service-registry/kafka-registry-producer/src/main/java/org/acme/kafka/Routes.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.kafka; + +import javax.enterprise.context.ApplicationScoped; +import org.apache.camel.builder.RouteBuilder; + +@ApplicationScoped +public class Routes extends RouteBuilder { + + @Override + public void configure() throws Exception { + // produces messages to kafka + + User user = new User(); + user.setName("John"); + user.setAge(36); + + from("timer:foo?period={{timer.period}}&delay={{timer.delay}}").routeId("FromTimer2Kafka") + .setBody(constant(user)).to("kafka:{{kafka.topic.name}}") + .log("Message sent correctly sent to the topic! : \"${body}\" "); + } +} diff --git a/kafka-service-registry/kafka-registry-producer/src/main/java/org/acme/kafka/User.java b/kafka-service-registry/kafka-registry-producer/src/main/java/org/acme/kafka/User.java new file mode 100644 index 0000000..7b18fe4 --- /dev/null +++ b/kafka-service-registry/kafka-registry-producer/src/main/java/org/acme/kafka/User.java @@ -0,0 +1,435 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.acme.kafka; + +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.SchemaStore; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; + +@org.apache.avro.specific.AvroGenerated +public class User extends org.apache.avro.specific.SpecificRecordBase + implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 1548979804423630989L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.acme.kafka\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"); + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA$; + } + + private static SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder<User> ENCODER = new BinaryMessageEncoder<User>(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder<User> DECODER = new BinaryMessageDecoder<User>(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder<User> getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder<User> getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * + * @param resolver + * a {@link SchemaStore} used to find schemas by fingerprint + * + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<User>(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this User to a ByteBuffer. + * + * @return a buffer holding the serialized data for this instance + * + * @throws java.io.IOException + * if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a User from a ByteBuffer. + * + * @param b + * a byte buffer holding serialized data for an instance of this class + * + * @return a User instance decoded from the given buffer + * + * @throws java.io.IOException + * if the given bytes could not be deserialized into an instance of this class + */ + public static User fromByteBuffer(java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + private java.lang.CharSequence name; + private int age; + + /** + * Default constructor. Note that this does not initialize fields to their default values from the schema. If that + * is desired then one should use <code>newBuilder()</code>. + */ + public User() { + } + + /** + * All-args constructor. + * + * @param name + * The new value for name + * @param age + * The new value for age + */ + public User(java.lang.CharSequence name, java.lang.Integer age) { + this.name = name; + this.age = age; + } + + public org.apache.avro.specific.SpecificData getSpecificData() { + return MODEL$; + } + + public org.apache.avro.Schema getSchema() { + return SCHEMA$; + } + + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: + return name; + case 1: + return age; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value = "unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: + name = (java.lang.CharSequence) value$; + break; + case 1: + age = (java.lang.Integer) value$; + break; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'name' field. + * + * @return The value of the 'name' field. + */ + public java.lang.CharSequence getName() { + return name; + } + + /** + * Sets the value of the 'name' field. + * + * @param value + * the value to set. + */ + public void setName(java.lang.CharSequence value) { + this.name = value; + } + + /** + * Gets the value of the 'age' field. + * + * @return The value of the 'age' field. + */ + public int getAge() { + return age; + } + + /** + * Sets the value of the 'age' field. + * + * @param value + * the value to set. + */ + public void setAge(int value) { + this.age = value; + } + + /** + * Creates a new User RecordBuilder. + * + * @return A new User RecordBuilder + */ + public static org.acme.kafka.User.Builder newBuilder() { + return new org.acme.kafka.User.Builder(); + } + + /** + * Creates a new User RecordBuilder by copying an existing Builder. + * + * @param other + * The existing builder to copy. + * + * @return A new User RecordBuilder + */ + public static org.acme.kafka.User.Builder newBuilder(org.acme.kafka.User.Builder other) { + if (other == null) { + return new org.acme.kafka.User.Builder(); + } else { + return new org.acme.kafka.User.Builder(other); + } + } + + /** + * Creates a new User RecordBuilder by copying an existing User instance. + * + * @param other + * The existing instance to copy. + * + * @return A new User RecordBuilder + */ + public static org.acme.kafka.User.Builder newBuilder(org.acme.kafka.User other) { + if (other == null) { + return new org.acme.kafka.User.Builder(); + } else { + return new org.acme.kafka.User.Builder(other); + } + } + + /** + * RecordBuilder for User instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User> + implements org.apache.avro.data.RecordBuilder<User> { + + private java.lang.CharSequence name; + private int age; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$); + } + + /** + * Creates a Builder by copying an existing Builder. + * + * @param other + * The existing Builder to copy. + */ + private Builder(org.acme.kafka.User.Builder other) { + super(other); + if (isValidValue(fields()[0], other.name)) { + this.name = data().deepCopy(fields()[0].schema(), other.name); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.age)) { + this.age = data().deepCopy(fields()[1].schema(), other.age); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + } + + /** + * Creates a Builder by copying an existing User instance + * + * @param other + * The existing instance to copy. + */ + private Builder(org.acme.kafka.User other) { + super(SCHEMA$); + if (isValidValue(fields()[0], other.name)) { + this.name = data().deepCopy(fields()[0].schema(), other.name); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.age)) { + this.age = data().deepCopy(fields()[1].schema(), other.age); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'name' field. + * + * @return The value. + */ + public java.lang.CharSequence getName() { + return name; + } + + /** + * Sets the value of the 'name' field. + * + * @param value + * The value of 'name'. + * + * @return This builder. + */ + public org.acme.kafka.User.Builder setName(java.lang.CharSequence value) { + validate(fields()[0], value); + this.name = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'name' field has been set. + * + * @return True if the 'name' field has been set, false otherwise. + */ + public boolean hasName() { + return fieldSetFlags()[0]; + } + + /** + * Clears the value of the 'name' field. + * + * @return This builder. + */ + public org.acme.kafka.User.Builder clearName() { + name = null; + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'age' field. + * + * @return The value. + */ + public int getAge() { + return age; + } + + /** + * Sets the value of the 'age' field. + * + * @param value + * The value of 'age'. + * + * @return This builder. + */ + public org.acme.kafka.User.Builder setAge(int value) { + validate(fields()[1], value); + this.age = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'age' field has been set. + * + * @return True if the 'age' field has been set, false otherwise. + */ + public boolean hasAge() { + return fieldSetFlags()[1]; + } + + /** + * Clears the value of the 'age' field. + * + * @return This builder. + */ + public org.acme.kafka.User.Builder clearAge() { + fieldSetFlags()[1] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public User build() { + try { + User record = new User(); + record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]); + record.age = fieldSetFlags()[1] ? this.age : (java.lang.Integer) defaultValue(fields()[1]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter<User> WRITER$ = (org.apache.avro.io.DatumWriter<User>) MODEL$ + .createDatumWriter(SCHEMA$); + + @Override + public void writeExternal(java.io.ObjectOutput out) throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader<User> READER$ = (org.apache.avro.io.DatumReader<User>) MODEL$ + .createDatumReader(SCHEMA$); + + @Override + public void readExternal(java.io.ObjectInput in) throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override + protected boolean hasCustomCoders() { + return true; + } + + @Override + public void customEncode(org.apache.avro.io.Encoder out) throws java.io.IOException { + out.writeString(this.name); + + out.writeInt(this.age); + + } + + @Override + public void customDecode(org.apache.avro.io.ResolvingDecoder in) throws java.io.IOException { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.name = in.readString(this.name instanceof Utf8 ? (Utf8) this.name : null); + + this.age = in.readInt(); + + } else { + for (int i = 0; i < 2; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.name = in.readString(this.name instanceof Utf8 ? (Utf8) this.name : null); + break; + + case 1: + this.age = in.readInt(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} diff --git a/kafka-service-registry/kafka-registry-producer/src/main/resources/application.properties b/kafka-service-registry/kafka-registry-producer/src/main/resources/application.properties new file mode 100644 index 0000000..dc65f9f --- /dev/null +++ b/kafka-service-registry/kafka-registry-producer/src/main/resources/application.properties @@ -0,0 +1,40 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +#Kafka topic Name +kafka.topic.name=test + +# How often should the messages be generated and pushed to Kafka Topic +timer.period = 10000 +timer.delay = 10000 + +# uncomment to set Kafka instance with SASL Oauth Bearer +camel.component.kafka.brokers = <brokers> +camel.component.kafka.security-protocol = SASL_SSL +camel.component.kafka.sasl-mechanism = OAUTHBEARER +camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + oauth.client.id="<client_id>" \ + oauth.client.secret="<client_secret>" \ + oauth.token.endpoint.uri="<token_endpoint_url>" ; +camel.component.kafka.additional-properties[sasl.login.callback.handler.class] = io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler +camel.component.kafka.additional-properties[apicurio.registry.url] = <registry_url> +camel.component.kafka.additional-properties[apicurio.auth.client.id] = <client_id> +camel.component.kafka.additional-properties[apicurio.auth.client.secret] = <client_secret> +camel.component.kafka.additional-properties[apicurio.auth.service.token.endpoint] = <token_endpoint_url> +camel.component.kafka.additional-properties[apicurio.registry.avro-datum-provider] = io.apicurio.registry.serde.avro.ReflectAvroDatumProvider +camel.component.kafka.additional-properties[apicurio.registry.artifact-resolver-strategy] = io.apicurio.registry.serde.avro.strategy.RecordIdStrategy +camel.component.kafka.valueSerializer = io.apicurio.registry.serde.avro.AvroKafkaSerializer diff --git a/kafka-service-registry/kafka-registry-producer/user.avsc b/kafka-service-registry/kafka-registry-producer/user.avsc new file mode 100644 index 0000000..7ecaa58 --- /dev/null +++ b/kafka-service-registry/kafka-registry-producer/user.avsc @@ -0,0 +1,15 @@ +{ + "namespace": "org.acme.kafka", + "type": "record", + "name": "User", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "age", + "type": "int" + } + ] +} diff --git a/kafka-service-registry/pom.xml b/kafka-service-registry/pom.xml new file mode 100644 index 0000000..869645b --- /dev/null +++ b/kafka-service-registry/pom.xml @@ -0,0 +1,20 @@ +<?xml version="1.0"?> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>camel-quarkus-examples-kafka-service-registry-example</artifactId> + <groupId>org.apache.camel.quarkus.examples</groupId> + <version>2.3.0</version> + + <name>Camel Quarkus :: Examples :: Kafka Service Registry</name> + <description>Camel Quarkus Example :: Kafka Service Registry</description> + + <packaging>pom</packaging> + + <modules> + <module>kafka-registry-producer</module> + <module>kafka-registry-consumer</module> + </modules> + +</project>