This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-service-registry-managed-example in repository https://gitbox.apache.org/repos/asf/camel-quarkus-examples.git
commit 06a1cc01f15e37ef33d9c8ba0e354c28e492f7a0 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Mon Oct 11 16:53:38 2021 +0200 Added sample for checking error --- kafka-sample/kafka-registry-prod/README.adoc | 10 + .../kubefiles/secret-example.yml | 28 ++ kafka-sample/kafka-registry-prod/pom.xml | 379 +++++++++++++++++++ .../src/main/docker/Dockerfile.jvm | 71 ++++ .../src/main/docker/Dockerfile.native | 44 +++ .../src/main/java/org/acme/kafka/Routes.java | 39 ++ .../src/main/java/org/acme/kafka/User.java | 418 +++++++++++++++++++++ .../src/main/resources/application.properties | 99 +++++ kafka-sample/kafka-registry-prod/user.avsc | 15 + 9 files changed, 1103 insertions(+) diff --git a/kafka-sample/kafka-registry-prod/README.adoc b/kafka-sample/kafka-registry-prod/README.adoc new file mode 100644 index 0000000..5db5c90 --- /dev/null +++ b/kafka-sample/kafka-registry-prod/README.adoc @@ -0,0 +1,10 @@ +== Camel Quarkus Kafka with Service Registry + +1. Populate correctly the credentials in application.properties after creating kafka instance and registry instance +2. mvn clean package +3. java -jar target/quarkus-app/quarkus-run.jar + +Error: + +2021-10-11 16:50:21,197 WARN [org.apa.cam.com.tim.TimerConsumer] (Camel (camel-1) thread #0 - timer://foo) Error processing exchange. Exchange[57AD50F0C45CD89-0000000000000000]. Caused by: [io.apicurio.registry.rest.client.exception.ArtifactNotFoundException - No artifact with ID 'test-value' in group 'null' was found.]: io.apicurio.registry.rest.client.exception.ArtifactNotFoundException: No artifact with ID 'test-value' in group 'null' was found. + diff --git a/kafka-sample/kafka-registry-prod/kubefiles/secret-example.yml b/kafka-sample/kafka-registry-prod/kubefiles/secret-example.yml new file mode 100644 index 0000000..72499b2 --- /dev/null +++ b/kafka-sample/kafka-registry-prod/kubefiles/secret-example.yml @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: v1 +kind: Secret +metadata: + name: camel-kafka + namespace: test +type: Opaque +stringData: + brokers: "<YOUR_KAFKA_BROKERS_URL>" + id: "<YOUR_KAFKA_SASL_CLIENT_ID>" + secret: "<YOUR_KAFKA_SASL_CLIENT_SECRET>" + token: "<YOUR_KAFKA_SASL_OAUTHBEARER_TOKEN_URL>" \ No newline at end of file diff --git a/kafka-sample/kafka-registry-prod/pom.xml b/kafka-sample/kafka-registry-prod/pom.xml new file mode 100644 index 0000000..051f0a0 --- /dev/null +++ b/kafka-sample/kafka-registry-prod/pom.xml @@ -0,0 +1,379 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>camel-quarkus-examples-kafka-prod</artifactId> + <groupId>org.apache.camel.quarkus.examples</groupId> + <version>2.3.0</version> + + <name>Camel Quarkus :: Examples :: Kafka</name> + <description>Camel Quarkus Example :: Kafka</description> + + <properties> + <quarkus.platform.version>2.3.0.Final</quarkus.platform.version> + <camel-quarkus.platform.version>${quarkus.platform.version}</camel-quarkus.platform.version> + + <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id> + <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id> + <camel-quarkus.platform.group-id>${quarkus.platform.group-id}</camel-quarkus.platform.group-id> + <camel-quarkus.platform.artifact-id>quarkus-camel-bom</camel-quarkus.platform.artifact-id> + + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <maven.compiler.target>11</maven.compiler.target> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.testTarget>${maven.compiler.target}</maven.compiler.testTarget> + <maven.compiler.testSource>${maven.compiler.source}</maven.compiler.testSource> + + <formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version> + <impsort-maven-plugin.version>1.3.2</impsort-maven-plugin.version> + <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version> + <maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> + <maven-resources-plugin.version>3.1.0</maven-resources-plugin.version> + <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> + <mycila-license.version>3.0</mycila-license.version> + <kafka-oauth-client.version>0.7.2</kafka-oauth-client.version> + </properties> + + <dependencyManagement> + <dependencies> + <!-- Import BOM --> + <dependency> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>${quarkus.platform.artifact-id}</artifactId> + <version>${quarkus.platform.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>${camel-quarkus.platform.group-id}</groupId> + <artifactId>${camel-quarkus.platform.artifact-id}</artifactId> + <version>${camel-quarkus.platform.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>io.strimzi</groupId> + <artifactId>kafka-oauth-client</artifactId> + <version>${kafka-oauth-client.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-microprofile-health</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-log</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-timer</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-seda</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-jackson</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-kubernetes-config</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-apicurio-registry-avro</artifactId> + </dependency> + <dependency> + <groupId>io.apicurio</groupId> + <artifactId>apicurio-registry-serdes-avro-serde</artifactId> + <version>2.1.0.Final</version> + </dependency> + + <!-- For oauth use case --> + <dependency> + <groupId>io.strimzi</groupId> + <artifactId>kafka-oauth-client</artifactId> + </dependency> + + <!-- Test --> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <pluginManagement> + <plugins> + + <plugin> + <groupId>net.revelc.code.formatter</groupId> + <artifactId>formatter-maven-plugin</artifactId> + <version>${formatter-maven-plugin.version}</version> + <configuration> + <configFile>${maven.multiModuleProjectDirectory}/eclipse-formatter-config.xml</configFile> + </configuration> + </plugin> + + <plugin> + <groupId>net.revelc.code</groupId> + <artifactId>impsort-maven-plugin</artifactId> + <version>${impsort-maven-plugin.version}</version> + <configuration> + <groups>java.,javax.,org.w3c.,org.xml.,junit.</groups> + <removeUnused>true</removeUnused> + <staticAfter>true</staticAfter> + <staticGroups>java.,javax.,org.w3c.,org.xml.,junit.</staticGroups> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin.version}</version> + <configuration> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + <compilerArgs> + <arg>-Xlint:unchecked</arg> + </compilerArgs> + </configuration> + </plugin> + +<plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>${avro.version}</version> + <executions> + <execution> + <id>schemas</id> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + <goal>protocol</goal> + <goal>idl-protocol</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory> + <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> + </configuration> + </execution> + </executions> +</plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <failIfNoTests>false</failIfNoTests> + <systemProperties> + <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager> + </systemProperties> + </configuration> + </plugin> + + <plugin> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <version>${quarkus.platform.version}</version> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>${maven-jar-plugin.version}</version> + </plugin> + + <plugin> + <groupId>com.mycila</groupId> + <artifactId>license-maven-plugin</artifactId> + <version>${mycila-license.version}</version> + <configuration> + <failIfUnknown>true</failIfUnknown> + <header>${maven.multiModuleProjectDirectory}/header.txt</header> + <excludes> + <exclude>**/*.adoc</exclude> + <exclude>**/*.txt</exclude> + <exclude>**/LICENSE.txt</exclude> + <exclude>**/LICENSE</exclude> + <exclude>**/NOTICE.txt</exclude> + <exclude>**/NOTICE</exclude> + <exclude>**/README</exclude> + <exclude>**/pom.xml.versionsBackup</exclude> + </excludes> + <mapping> + <java>SLASHSTAR_STYLE</java> + <properties>CAMEL_PROPERTIES_STYLE</properties> + <Dockerfile.jvm>CAMEL_PROPERTIES_STYLE</Dockerfile.jvm> + <Dockerfile.native>CAMEL_PROPERTIES_STYLE</Dockerfile.native> + </mapping> + <headerDefinitions> + <headerDefinition>${maven.multiModuleProjectDirectory}/license-properties-headerdefinition.xml</headerDefinition> + </headerDefinitions> + </configuration> + </plugin> + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <executions> + <execution> + <id>build</id> + <goals> + <goal>build</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>net.revelc.code.formatter</groupId> + <artifactId>formatter-maven-plugin</artifactId> + <executions> + <execution> + <id>format</id> + <goals> + <goal>format</goal> + </goals> + <phase>process-sources</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>net.revelc.code</groupId> + <artifactId>impsort-maven-plugin</artifactId> + <executions> + <execution> + <id>sort-imports</id> + <goals> + <goal>sort</goal> + </goals> + <phase>process-sources</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>native</id> + <activation> + <property> + <name>native</name> + </property> + </activation> + <properties> + <quarkus.package.type>native</quarkus.package.type> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>kubernetes</id> + <activation> + <property> + <name>kubernetes</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-kubernetes</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-container-image-jib</artifactId> + </dependency> + </dependencies> + </profile> + <profile> + <id>openshift</id> + <activation> + <property> + <name>openshift</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-openshift</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-container-image-openshift</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> + +</project> diff --git a/kafka-sample/kafka-registry-prod/src/main/docker/Dockerfile.jvm b/kafka-sample/kafka-registry-prod/src/main/docker/Dockerfile.jvm new file mode 100644 index 0000000..1e65c99 --- /dev/null +++ b/kafka-sample/kafka-registry-prod/src/main/docker/Dockerfile.jvm @@ -0,0 +1,71 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode +# +# Before building the container image run: +# +# ./mvnw package +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/camel-quarkus-examples-kafka-jvm . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/camel-quarkus-examples-kafka-jvm +# +# If you want to include the debug port into your docker image +# you will have to expose the debug port (default 5005) like this : EXPOSE 8080 5005 +# +# Then run the container using : +# +# docker run -i --rm -p 8080:8080 -p 5005:5005 -e JAVA_ENABLE_DEBUG="true" quarkus/camel-quarkus-examples-kafka-jvm +# +### +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 + +ARG JAVA_PACKAGE=java-11-openjdk-headless +ARG RUN_JAVA_VERSION=1.3.8 +ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' +# Install java and the run-java script +# Also set up permissions for user `1001` +RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \ + && microdnf update \ + && microdnf clean all \ + && mkdir /deployments \ + && chown 1001 /deployments \ + && chmod "g+rwX" /deployments \ + && chown 1001:root /deployments \ + && curl https://repo1.maven.org/maven2/io/fabric8/run-java-sh/${RUN_JAVA_VERSION}/run-java-sh-${RUN_JAVA_VERSION}-sh.sh -o /deployments/run-java.sh \ + && chown 1001 /deployments/run-java.sh \ + && chmod 540 /deployments/run-java.sh \ + && echo "securerandom.source=file:/dev/urandom" >> /etc/alternatives/jre/conf/security/java.security + +# Configure the JAVA_OPTIONS, you can add -XshowSettings:vm to also display the heap size. +ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" +# We make four distinct layers so if there are application changes the library layers can be re-used +COPY --chown=1001 target/quarkus-app/lib/ /deployments/lib/ +COPY --chown=1001 target/quarkus-app/*.jar /deployments/ +COPY --chown=1001 target/quarkus-app/app/ /deployments/app/ +COPY --chown=1001 target/quarkus-app/quarkus/ /deployments/quarkus/ + +EXPOSE 8080 +USER 1001 + +ENTRYPOINT [ "/deployments/run-java.sh" ] \ No newline at end of file diff --git a/kafka-sample/kafka-registry-prod/src/main/docker/Dockerfile.native b/kafka-sample/kafka-registry-prod/src/main/docker/Dockerfile.native new file mode 100644 index 0000000..04038f5 --- /dev/null +++ b/kafka-sample/kafka-registry-prod/src/main/docker/Dockerfile.native @@ -0,0 +1,44 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode +# +# Before building the container image run: +# +# ./mvnw package -Pnative +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.native -t quarkus/camel-quarkus-examples-kafka . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/camel-quarkus-examples-kafka +# +### +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 +WORKDIR /work/ +RUN chown 1001 /work \ + && chmod "g+rwX" /work \ + && chown 1001:root /work +COPY --chown=1001:root target/*-runner /work/application + +EXPOSE 8080 +USER 1001 + +CMD ["./application", "-Dquarkus.http.host=0.0.0.0"] \ No newline at end of file diff --git a/kafka-sample/kafka-registry-prod/src/main/java/org/acme/kafka/Routes.java b/kafka-sample/kafka-registry-prod/src/main/java/org/acme/kafka/Routes.java new file mode 100644 index 0000000..7cdbe5f --- /dev/null +++ b/kafka-sample/kafka-registry-prod/src/main/java/org/acme/kafka/Routes.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.kafka; + +import javax.enterprise.context.ApplicationScoped; + +import org.apache.camel.builder.RouteBuilder; + +@ApplicationScoped +public class Routes extends RouteBuilder { + + @Override + public void configure() throws Exception { + // produces messages to kafka + + User user = new User(); + user.setName("pippo"); + + from("timer:foo?period={{timer.period}}&delay={{timer.delay}}") + .routeId("FromTimer2Kafka") + .setBody(constant(user)) + .to("kafka:{{kafka.topic.name}}") + .log("Message sent correctly sent to the topic! : \"${body}\" "); + } +} diff --git a/kafka-sample/kafka-registry-prod/src/main/java/org/acme/kafka/User.java b/kafka-sample/kafka-registry-prod/src/main/java/org/acme/kafka/User.java new file mode 100644 index 0000000..4d84b1d --- /dev/null +++ b/kafka-sample/kafka-registry-prod/src/main/java/org/acme/kafka/User.java @@ -0,0 +1,418 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.acme.kafka; + +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.SchemaStore; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; + +@org.apache.avro.specific.AvroGenerated +public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 1548979804423630989L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.acme.kafka\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"); + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA$; + } + + private static SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder<User> ENCODER = new BinaryMessageEncoder<User>(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder<User> DECODER = new BinaryMessageDecoder<User>(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder<User> getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder<User> getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<User>(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this User to a ByteBuffer. + * + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a User from a ByteBuffer. + * + * @param b a byte buffer holding serialized data for an instance of this class + * @return a User instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static User fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + private java.lang.CharSequence name; + private int age; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use <code>newBuilder()</code>. + */ + public User() { + } + + /** + * All-args constructor. + * + * @param name The new value for name + * @param age The new value for age + */ + public User(java.lang.CharSequence name, java.lang.Integer age) { + this.name = name; + this.age = age; + } + + public org.apache.avro.specific.SpecificData getSpecificData() { + return MODEL$; + } + + public org.apache.avro.Schema getSchema() { + return SCHEMA$; + } + + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: + return name; + case 1: + return age; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value = "unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: + name = (java.lang.CharSequence) value$; + break; + case 1: + age = (java.lang.Integer) value$; + break; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'name' field. + * + * @return The value of the 'name' field. + */ + public java.lang.CharSequence getName() { + return name; + } + + /** + * Sets the value of the 'name' field. + * + * @param value the value to set. + */ + public void setName(java.lang.CharSequence value) { + this.name = value; + } + + /** + * Gets the value of the 'age' field. + * + * @return The value of the 'age' field. + */ + public int getAge() { + return age; + } + + /** + * Sets the value of the 'age' field. + * + * @param value the value to set. + */ + public void setAge(int value) { + this.age = value; + } + + /** + * Creates a new User RecordBuilder. + * + * @return A new User RecordBuilder + */ + public static org.acme.kafka.User.Builder newBuilder() { + return new org.acme.kafka.User.Builder(); + } + + /** + * Creates a new User RecordBuilder by copying an existing Builder. + * + * @param other The existing builder to copy. + * @return A new User RecordBuilder + */ + public static org.acme.kafka.User.Builder newBuilder(org.acme.kafka.User.Builder other) { + if (other == null) { + return new org.acme.kafka.User.Builder(); + } else { + return new org.acme.kafka.User.Builder(other); + } + } + + /** + * Creates a new User RecordBuilder by copying an existing User instance. + * + * @param other The existing instance to copy. + * @return A new User RecordBuilder + */ + public static org.acme.kafka.User.Builder newBuilder(org.acme.kafka.User other) { + if (other == null) { + return new org.acme.kafka.User.Builder(); + } else { + return new org.acme.kafka.User.Builder(other); + } + } + + /** + * RecordBuilder for User instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User> + implements org.apache.avro.data.RecordBuilder<User> { + + private java.lang.CharSequence name; + private int age; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$); + } + + /** + * Creates a Builder by copying an existing Builder. + * + * @param other The existing Builder to copy. + */ + private Builder(org.acme.kafka.User.Builder other) { + super(other); + if (isValidValue(fields()[0], other.name)) { + this.name = data().deepCopy(fields()[0].schema(), other.name); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.age)) { + this.age = data().deepCopy(fields()[1].schema(), other.age); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + } + + /** + * Creates a Builder by copying an existing User instance + * + * @param other The existing instance to copy. + */ + private Builder(org.acme.kafka.User other) { + super(SCHEMA$); + if (isValidValue(fields()[0], other.name)) { + this.name = data().deepCopy(fields()[0].schema(), other.name); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.age)) { + this.age = data().deepCopy(fields()[1].schema(), other.age); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'name' field. + * + * @return The value. + */ + public java.lang.CharSequence getName() { + return name; + } + + /** + * Sets the value of the 'name' field. + * + * @param value The value of 'name'. + * @return This builder. + */ + public org.acme.kafka.User.Builder setName(java.lang.CharSequence value) { + validate(fields()[0], value); + this.name = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'name' field has been set. + * + * @return True if the 'name' field has been set, false otherwise. + */ + public boolean hasName() { + return fieldSetFlags()[0]; + } + + /** + * Clears the value of the 'name' field. + * + * @return This builder. + */ + public org.acme.kafka.User.Builder clearName() { + name = null; + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'age' field. + * + * @return The value. + */ + public int getAge() { + return age; + } + + /** + * Sets the value of the 'age' field. + * + * @param value The value of 'age'. + * @return This builder. + */ + public org.acme.kafka.User.Builder setAge(int value) { + validate(fields()[1], value); + this.age = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'age' field has been set. + * + * @return True if the 'age' field has been set, false otherwise. + */ + public boolean hasAge() { + return fieldSetFlags()[1]; + } + + /** + * Clears the value of the 'age' field. + * + * @return This builder. + */ + public org.acme.kafka.User.Builder clearAge() { + fieldSetFlags()[1] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public User build() { + try { + User record = new User(); + record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]); + record.age = fieldSetFlags()[1] ? this.age : (java.lang.Integer) defaultValue(fields()[1]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter<User> WRITER$ = (org.apache.avro.io.DatumWriter<User>) MODEL$ + .createDatumWriter(SCHEMA$); + + @Override + public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader<User> READER$ = (org.apache.avro.io.DatumReader<User>) MODEL$ + .createDatumReader(SCHEMA$); + + @Override + public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override + protected boolean hasCustomCoders() { + return true; + } + + @Override + public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException { + out.writeString(this.name); + + out.writeInt(this.age); + + } + + @Override + public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.name = in.readString(this.name instanceof Utf8 ? (Utf8) this.name : null); + + this.age = in.readInt(); + + } else { + for (int i = 0; i < 2; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.name = in.readString(this.name instanceof Utf8 ? (Utf8) this.name : null); + break; + + case 1: + this.age = in.readInt(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} diff --git a/kafka-sample/kafka-registry-prod/src/main/resources/application.properties b/kafka-sample/kafka-registry-prod/src/main/resources/application.properties new file mode 100644 index 0000000..7f37bf4 --- /dev/null +++ b/kafka-sample/kafka-registry-prod/src/main/resources/application.properties @@ -0,0 +1,99 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +#Kafka topic Name +kafka.topic.name=test + +# How often should the messages be generated and pushed to Kafka Topic +timer.period = 10000 +timer.delay = 10000 + +# Kafka instance without Authentication +#camel.component.kafka.brokers=${brokers} + +# uncomment to set Kafka instance with SASL Plain +#camel.component.kafka.brokers=${brokers} +#camel.component.kafka.security-protocol=SASL_SSL +#camel.component.kafka.sasl-mechanism=PLAIN +#camel.component.kafka.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${id}" password="${secret}"; + +# uncomment to set Kafka instance with SASL Oauth Bearer +camel.component.kafka.brokers = <brokers> +camel.component.kafka.security-protocol = SASL_SSL +camel.component.kafka.sasl-mechanism = OAUTHBEARER +camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + oauth.client.id="<client_id>" \ + oauth.client.secret="<client_secret>" \ + oauth.token.endpoint.uri="https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token" ; +camel.component.kafka.additional-properties[sasl.login.callback.handler.class] = io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler +camel.component.kafka.additional-properties[apicurio.registry.url] = <registry_url> +camel.component.kafka.additional-properties[apicurio.auth.client.id] = <client_id> +camel.component.kafka.additional-properties[apicurio.auth.client.secret] = <client_secret> +camel.component.kafka.additional-properties[apicurio.auth.service.token.endpoint] = https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token +camel.component.kafka.additional-properties[apicurio.registry.avro-datum-provider] = io.apicurio.registry.serde.avro.ReflectAvroDatumProvider +camel.component.kafka.additional-properties[apicurio.registry.artifact-resolver-strategy] = io.apicurio.registry.serde.avro.strategy.RecordIdStrategy +camel.component.kafka.valueSerializer = io.apicurio.registry.serde.avro.AvroKafkaSerializer + + + +################################### +# Kubernetes specific +################################### +# secrets +#quarkus.kubernetes-config.enabled=true +#getting secrets while deploying to kubernetes +#quarkus.kubernetes-config.namespace=test +#quarkus.kubernetes-config.secrets.enabled=true +#quarkus.kubernetes-config.secrets=camel-kafka + +# creating container with jib +#quarkus.container-image.build=true +#quarkus.kubernetes.deployment-target=kubernetes +#quarkus.container-image.group=<YOUR_IMAGE_GROUP> +#quarkus.container-image.registry=<YOUR_REGISTRY_URL> + +# Uncomment to trust self signed certificates if they are presented by the Kubernetes API server +#quarkus.kubernetes-client.trust-certs=true + +# Uncomment to set resource limits +#quarkus.kubernetes.resources.requests.memory=64Mi +#quarkus.kubernetes.resources.requests.cpu=250m +#quarkus.kubernetes.resources.limits.memory=512Mi +#quarkus.kubernetes.resources.limits.cpu=1000m + +################################### +# OpenShift specific +################################### +# secrets +#quarkus.kubernetes-config.enabled=true +#getting secrets while deploying to kubernetes +#quarkus.kubernetes-config.namespace=test +#quarkus.kubernetes-config.secrets.enabled=true +#quarkus.kubernetes-config.secrets=camel-kafka + +# creating container for openshift +#quarkus.container-image.build=true +#quarkus.kubernetes.deployment-target=openshift + +# OpenShift +#quarkus.openshift.image-pull-policy=IfNotPresent + +# Uncomment to set resource limits +#quarkus.openshift.resources.requests.memory=64Mi +#quarkus.openshift.resources.requests.cpu=250m +#quarkus.openshift.resources.limits.memory=512Mi +#quarkus.openshift.resources.limits.cpu=1000m diff --git a/kafka-sample/kafka-registry-prod/user.avsc b/kafka-sample/kafka-registry-prod/user.avsc new file mode 100644 index 0000000..7ecaa58 --- /dev/null +++ b/kafka-sample/kafka-registry-prod/user.avsc @@ -0,0 +1,15 @@ +{ + "namespace": "org.acme.kafka", + "type": "record", + "name": "User", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "age", + "type": "int" + } + ] +}