Hi Kara,

This is obviously not the correct place for this question.
I saw you also asked in Apicurio-registry
<https://github.com/Apicurio/apicurio-registry/issues/6189> repo
<https://github.com/Apicurio/apicurio-registry/issues/6189>.
Please wait for their response there.

Thanks.
Luke


On Mon, May 5, 2025 at 7:18 AM karan alang <karan.al...@gmail.com> wrote:

> I have Apicurio Schema Registry 3.0.7 installed on GKE. I'm trying to use
> this in Confluent Compatibility mode .. primarily because on production, my
> Kafka Producer is C++ client, and Apicurio serde is not supported for C++
> client.
>
> Here is the Java code:
>
> ```
>
> import com.versa.apicurio.confluent.Employee;
> import com.versa.apicurio.confluent.DepartmentEnum;
> import org.apache.kafka.clients.producer.*;
> import org.apache.kafka.common.serialization.StringSerializer;
> import io.confluent.kafka.serializers.KafkaAvroSerializer;
>
>
> public class VersionBasedKafkaProducer {
>
>     private static final String REGISTRY_URL =
> "https://apicurio-sr.vkp.versa-vani.com";;
>     private static final String LETS_ENCRYPT_ROOT_URL =
> "https://letsencrypt.org/certs/isrgrootx1.pem";;
>     private static final String LETS_ENCRYPT_R11_URL =
> "https://letsencrypt.org/certs/2024/r11.pem";;
>     private static final String TRUSTSTORE_PASSWORD = "changeit";
>     private static final String TRUSTSTORE_PATH =
> "apicurio-truststore.jks";
>
>     /**
>      * Creates a custom truststore with Let's Encrypt certificates
>      * @return The path to the created truststore
>      */
>     private static String createCustomTruststore() throws Exception {
>         <truncated>
>     }
>
>     /**
>      * Downloads an X.509 certificate from a URL
>      */
>     private static X509Certificate downloadCertificate(String certUrl)
> throws Exception {
>         <truncated>
>     }
>
>     /**
>      * Get the Access Token for authentication
>      */
>     public static String getAccessToken(String tokenUrl, String
> clientId, String clientSecret) throws Exception {
>         String params = "grant_type=client_credentials"
>             + "&client_id=" + clientId
>             + "&client_secret=" + clientSecret;
>         URL url = new URL(tokenUrl);
>         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
>         conn.setRequestMethod("POST");
>         conn.setDoOutput(true);
>         conn.setRequestProperty("Content-Type",
> "application/x-www-form-urlencoded");
>         try (OutputStream os = conn.getOutputStream()) {
>             os.write(params.getBytes(StandardCharsets.UTF_8));
>         }
>         try (Scanner scanner = new Scanner(conn.getInputStream(),
> StandardCharsets.UTF_8.name())) {
>             String resp = scanner.useDelimiter("\\A").next();
>             return resp.split("\"access_token\":\"")[1].split("\"")[0];
>         }
>     }
>
>     apiv
>
>     /**
>  * Verify schema exists and get information about it, returning the schema
> ID
>  */
> private static int verifySchemaAndGetId(String token) throws Exception {
>     System.out.println("\n=== Verifying schema in registry ===");
>
>     // Subject name with RecordNameStrategy
>     String subject = "com.versa.apicurio.confluent.Employee";
>
>     // Check if the subject exists
>     String subjectsUrl = String.format("%s/apis/ccompat/v7/subjects",
> REGISTRY_URL);
>     System.out.println("Checking all subjects at: " + subjectsUrl);
>
>     URL url = new URL(subjectsUrl);
>     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
>     conn.setRequestMethod("GET");
>     conn.setRequestProperty("Authorization", "Bearer " + token);
>
>     int responseCode = conn.getResponseCode();
>     System.out.println("Response Code: " + responseCode);
>
>     boolean subjectExists = false;
>
>     if (responseCode == 200) {
>         try (Scanner scanner = new Scanner(conn.getInputStream(),
> StandardCharsets.UTF_8.name())) {
>             String resp = scanner.useDelimiter("\\A").next();
>             System.out.println("All subjects: " + resp);
>
>             // Check if our subject is in the list
>             subjectExists = resp.contains("\"" + subject + "\"");
>             System.out.println("Subject '" + subject + "' exists: " +
> subjectExists);
>         }
>     } else {
>         System.out.println("Failed to retrieve subjects list. Response
> code: " + responseCode);
>         return -1;
>     }
>
>     if (!subjectExists) {
>         System.out.println("Subject '" + subject + "' does not exist
> in the registry!");
>         return -1;
>     }
>
>     // Get versions
>     String versionsUrl =
> String.format("%s/apis/ccompat/v7/subjects/%s/versions", REGISTRY_URL,
> subject);
>     System.out.println("Fetching versions at: " + versionsUrl);
>
>     URL versionsUrlObj = new URL(versionsUrl);
>     HttpURLConnection versionsConn = (HttpURLConnection)
> versionsUrlObj.openConnection();
>     versionsConn.setRequestMethod("GET");
>     versionsConn.setRequestProperty("Authorization", "Bearer " + token);
>
>     int versionsResponseCode = versionsConn.getResponseCode();
>     String highestVersion = "1";
>
>     if (versionsResponseCode == 200) {
>         try (Scanner scanner = new
> Scanner(versionsConn.getInputStream(), StandardCharsets.UTF_8.name()))
> {
>             String resp = scanner.useDelimiter("\\A").next();
>             System.out.println("Available versions: " + resp);
>
>             if (resp.equals("[]") || resp.trim().isEmpty()) {
>                 System.out.println("Warning: No versions found for
> this subject!");
>                 return -1;
>             } else {
>                 // Find the highest version number
>                 Pattern pattern = Pattern.compile("\\d+");
>                 Matcher matcher = pattern.matcher(resp);
>                 int maxVersion = 0;
>                 while (matcher.find()) {
>                     try {
>                         int version = Integer.parseInt(matcher.group());
>                         if (version > maxVersion) {
>                             maxVersion = version;
>                             highestVersion = matcher.group();
>                         }
>                     } catch (NumberFormatException e) {
>                         // Skip if not a valid number
>                     }
>                 }
>                 System.out.println("Highest version found: " +
> highestVersion);
>             }
>         }
>     } else {
>         System.out.println("Failed to list versions. Response code: "
> + versionsResponseCode);
>         return -1;
>     }
>
>     // Get the specific version
>     String versionUrl =
> String.format("%s/apis/ccompat/v7/subjects/%s/versions/%s",
>             REGISTRY_URL, subject, highestVersion);
>     System.out.println("Fetching schema at: " + versionUrl);
>
>     URL specificVersionUrl = new URL(versionUrl);
>     HttpURLConnection versionConn = (HttpURLConnection)
> specificVersionUrl.openConnection();
>     versionConn.setRequestMethod("GET");
>     versionConn.setRequestProperty("Authorization", "Bearer " + token);
>
>     int versionResponseCode = versionConn.getResponseCode();
>     int schemaId = -1;
>
>     if (versionResponseCode == 200) {
>         try (Scanner scanner = new
> Scanner(versionConn.getInputStream(), StandardCharsets.UTF_8.name()))
> {
>             String resp = scanner.useDelimiter("\\A").next();
>             System.out.println("Version " + highestVersion + " schema
> (raw): " + resp);
>
>             // Extract schema ID
>             if (resp.contains("\"id\":")) {
>                 String idStr =
> resp.split("\"id\":")[1].split(",")[0].trim();
>                 try {
>                     schemaId = Integer.parseInt(idStr);
>                     System.out.println("Schema ID: " + schemaId);
>                 } catch (NumberFormatException e) {
>                     System.out.println("Failed to parse schema ID: " +
> idStr);
>                     return -1;
>                 }
>             }
>
>             // Extract and pretty print the schema
>             try {
>                 String schemaJson =
> resp.split("\"schema\":\"")[1].split("\",\"schemaType\"")[0];
>                 // Replace escaped quotes with regular quotes
>                 schemaJson = schemaJson.replace("\\\"", "\"");
>                 // Replace escaped backslashes with regular backslashes
>                 schemaJson = schemaJson.replace("\\\\", "\\");
>                 System.out.println("\nSchema content (prettified):\n"
> + schemaJson);
>             } catch (Exception e) {
>                 System.out.println("Failed to prettify schema: " +
> e.getMessage());
>             }
>         }
>         return schemaId;
>     } else {
>         System.out.println("Failed to get version. Response code: " +
> versionResponseCode);
>         return -1;
>     }
> }
>
> public static void main(String[] args) throws Exception {
>     // Create truststore for SSL
>     String truststorePath = createCustomTruststore();
>
>     // Get token
>     String tokenUrl =
> "
> https://keycloak.vkp.versa-vani.com/realms/readonly-realm/protocol/openid-connect/token
> ";
>     String clientId = "apicurio-registry";
>     String clientSecret = "<secret>";
>     String token = getAccessToken(tokenUrl, clientId, clientSecret);
>     System.out.println("Got access token: " + token.substring(0, 15) +
> "...");
>
>     // Get schema ID during verification
>     int schemaId = verifySchemaAndGetId(token);
>     // System.out.println("schemaId",  schemaId);
>     if (schemaId <= 0) {
>         System.out.println("Failed to get valid schema ID. Exiting.");
>         return;
>     }
>
>     // Add direct schema ID check
>     String schemaIdUrl =
> String.format("%s/apis/ccompat/v7/schemas/ids/%d", REGISTRY_URL,
> schemaId);
>     System.out.println("\n=== Testing direct schema retrieval by ID ===");
>     System.out.println("Testing URL: " + schemaIdUrl);
>
>     URL schemaIdUrlObj = new URL(schemaIdUrl);
>     HttpURLConnection schemaIdConn = (HttpURLConnection)
> schemaIdUrlObj.openConnection();
>     schemaIdConn.setRequestMethod("GET");
>     schemaIdConn.setRequestProperty("Authorization", "Bearer " + token);
>
>     int schemaIdResponseCode = schemaIdConn.getResponseCode();
>     System.out.println("Schema ID check response code: " +
> schemaIdResponseCode);
>
>     if (schemaIdResponseCode == 200) {
>         try (Scanner scanner = new
> Scanner(schemaIdConn.getInputStream(), StandardCharsets.UTF_8.name()))
> {
>             String resp = scanner.useDelimiter("\\A").next();
>             System.out.println("Schema by ID response: " + resp);
>         }
>     } else {
>         System.out.println("Failed to get schema by ID. Response code:
> " + schemaIdResponseCode);
>     }
>
>     // Set up Kafka producer
>     Properties props = new Properties();
>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<IP>:<port>");
>     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
>     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> KafkaAvroSerializer.class.getName());
>
>     // Schema Registry config - Updated with dynamic schema ID
>     props.put("schema.registry.url", REGISTRY_URL + "/apis/ccompat/v7");
>     props.put("bearer.auth.credentials.source", "USER_INFO");
>     props.put("bearer.auth.token", token);
>     props.put("value.subject.name.strategy",
> "io.confluent.kafka.serializers.subject.RecordNameStrategy");
>     props.put("auto.register.schemas", "false");
>     props.put("apicurio.registry.as-confluent", "true");
>     props.put("apicurio.registry.use-id", "contentId");
>
>     // props.put("value.subject.name.strategy",
> "io.confluent.kafka.serializers.subject.TopicNameStrategy");
>     // props.put("auto.register.schemas", "true");  //
>
>     // Use the dynamically retrieved schema ID
>     props.put("use.schema.id", String.valueOf(schemaId));
>
>     // Additional helpful configuration
>     props.put("specific.avro.reader", "true");
>     props.put("use.latest.version", "true");
>
>     // SSL settings for Kafka
>     props.put("ssl.truststore.location",
>
> "/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc-tf/versa-kafka-poc-tf-cluster-ca-cert/versa-kafka-poc-tf-ca.p12");
>     props.put("ssl.truststore.password", "pwd");
>     props.put("ssl.truststore.type", "PKCS12");
>     props.put("ssl.keystore.location",
>
> "/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc/syslog-vani-prefix/syslog-vani-prefix.p12");
>     props.put("ssl.keystore.password", "pwd");
>     props.put("ssl.keystore.type", "PKCS12");
>     props.put("security.protocol", "SSL");
>
>     // Schema Registry SSL settings
>     props.put("schema.registry.ssl.truststore.location", truststorePath);
>     props.put("schema.registry.ssl.truststore.password",
> TRUSTSTORE_PASSWORD);
>     props.put("schema.registry.ssl.truststore.type", "JKS");
>     props.put("schema.registry.ssl.endpoint.identification.algorithm",
> "HTTPS");
>
>     props.put("schema.reflection", "false");
>     props.put("specific.avro.reader", "true");
>     props.put("schema.registry.group.id", "default");
>
>     // Add debugging parameter
>     props.put("debug", "all");
>
>     // For Apicurio 3.0.7, schema may be under a group
>     props.put("schema.registry.group.id", "default");
>
>     // Enhanced debugging
>
> System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.serializers",
> "TRACE");
>
> System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.schemaregistry",
> "TRACE");
>
>     // Using your original topic
>     String topic = "<topic>";
>
>     // Update to your main method where you create and send messages
> try (Producer<String, Employee> producer = new KafkaProducer<>(props)) {
>     System.out.println("Producer created successfully. Sending
> messages to: " + topic);
>
>     System.out.println("Sending employee records...");
>
>     // Create employees with null values for new_col2 and new_col3
>     Employee[] employees = {
>         Employee.newBuilder()
>                 .setId(1)
>                 .setName("John Doe")
>                 .setAge(30)
>                 .setSalary(75000.0f)
>                 .setDepartment(DepartmentEnum.ENGINEERING)
>                 .setEmail(null)
>                 .setNewCol(null)
>                 .setNewCol2(null)
>                 .setNewCol3(null)
>                 .build(),
>         Employee.newBuilder()
>                 .setId(2)
>                 .setName("Jane Smith")
>                 .setAge(28)
>                 .setSalary(85000.0f)
>                 .setDepartment(DepartmentEnum.HR)
>                 .setEmail(null)
>                 .setNewCol(null)
>                 .setNewCol2(null)
>                 .setNewCol3(null)
>                 .build(),
>         Employee.newBuilder()
>                 .setId(3)
>                 .setName("Bob Johnson")
>                 .setSalary(65000.0f)
>                 .setDepartment(DepartmentEnum.SALES)
>                 .setAge(null)
>                 .setEmail(null)
>                 .setNewCol(null)
>                 .setNewCol2(null)
>                 .setNewCol3(null)
>                 .build()
>     };
>
>     // Send each record
>     for (Employee emp : employees) {
>         System.out.println("Sending record for " + emp.getName() + "
> with ID: " + emp.getId());
>         ProducerRecord<String, Employee> record = new
> ProducerRecord<>(topic, String.valueOf(emp.getId()), emp);
>
>         // Use asynchronous sending with a callback instead of synchronous
> get()
>         producer.send(record, new Callback() {
>             @Override
>             public void onCompletion(RecordMetadata metadata, Exception e)
> {
>                 if (e != null) {
>                     System.err.println("Error sending record for " +
> emp.getName() + ":");
>                     e.printStackTrace();
>                 } else {
>                     System.out.println("Successfully sent " +
> emp.getName() + " to partition " +
>                             metadata.partition() + " at offset " +
> metadata.offset());
>                 }
>             }
>         });
>
>         // Small delay to see logs clearly
>         Thread.sleep(100);
>     }
>
>     // Ensure all records are sent before closing producer
>     producer.flush();
>     System.out.println("All messages sent successfully");
> }
>
> ```
>
> here is the error :
>
> ```
>
> (base) Karans-MacBook-Pro:apicurio_confluent karanalang$
> (base) Karans-MacBook-Pro:apicurio_confluent karanalang$ mvn exec:java
> -Dexec.mainClass="com.versa.apicurio.confluent.VersionBasedKafkaProducer"
> -Dlog4j.debug
> [INFO] Scanning for projects...
> [INFO]
> [INFO] ----------< com.versa.apicurio.confluent:apicurio_confluent
> >-----------
> [INFO] Building apicurio_confluent 1.0-SNAPSHOT
> [INFO] --------------------------------[ jar
> ]---------------------------------
> [INFO]
> [INFO] --- exec-maven-plugin:3.5.0:java (default-cli) @ apicurio_confluent
> ---
> Creating custom truststore with Let's Encrypt certificates...
> Downloading Let's Encrypt Root certificate...
> Downloading Let's Encrypt R11 certificate...
> Custom truststore created at:
>
> /Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks
> Got access token: eyJhbGciOiJSUzI...
>
> === Verifying schema in registry ===
> Checking all subjects at:
> https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects
> Response Code: 200
> All subjects: ["com.versa.apicurio.confluent.Employee"]
> Subject 'com.versa.apicurio.confluent.Employee' exists: true
> Fetching versions at:
>
> https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions
> Available versions: [1,2,3,4]
> Highest version found: 4
> Fetching schema at:
>
> https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions/4
> Version 4 schema (raw):
>
> {"subject":"com.versa.apicurio.confluent.Employee","version":4,"id":2,"schema":"{\"type\":
> \"record\", \"name\": \"Employee\", \"namespace\":
> \"com.versa.apicurio.confluent\", \"fields\": [{\"name\": \"id\",
> \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"},
> {\"name\": \"salary\", \"type\": [\"null\", \"float\"], \"default\":
> null}, {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\":
> null}, {\"name\": \"department\", \"type\": {\"type\": \"enum\",
> \"name\": \"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\",
> \"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"],
> \"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\",
> \"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\":
> [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\",
> \"type\": [\"null\", \"string\"], \"default\":
> null}]}","schemaType":"AVRO","references":[]}
> Schema ID: 2
>
> Schema content (prettified):
> {"type": "record", "name": "Employee", "namespace":
> "com.versa.apicurio.confluent", "fields": [{"name": "id", "type":
> "int"}, {"name": "name", "type": "string"}, {"name": "salary", "type":
> ["null", "float"], "default": null}, {"name": "age", "type": ["null",
> "int"], "default": null}, {"name": "department", "type": {"type":
> "enum", "name": "DepartmentEnum", "symbols": ["HR", "ENGINEERING",
> "SALES"]}}, {"name": "email", "type": ["null", "string"], "default":
> null}, {"name": "new_col", "type": ["null", "string"], "default":
> null}, {"name": "new_col2", "type": ["null", "string"], "default":
> null}, {"name": "new_col3", "type": ["null", "string"], "default":
> null}]}
>
> === Testing direct schema retrieval by ID ===
> Testing URL:
> https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/schemas/ids/2
> Schema ID check response code: 200
> Schema by ID response: {"schema":"{\"type\": \"record\", \"name\":
> \"Employee\", \"namespace\": \"com.versa.apicurio.confluent\",
> \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\":
> \"name\", \"type\": \"string\"}, {\"name\": \"salary\", \"type\":
> [\"null\", \"float\"], \"default\": null}, {\"name\": \"age\",
> \"type\": [\"null\", \"int\"], \"default\": null}, {\"name\":
> \"department\", \"type\": {\"type\": \"enum\", \"name\":
> \"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\",
> \"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"],
> \"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\",
> \"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\":
> [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\",
> \"type\": [\"null\", \"string\"], \"default\":
> null}]}","schemaType":"AVRO","references":[]}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/Users/karanalang/.m2/repository/org/slf4j/slf4j-reload4j/1.7.36/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/Users/karanalang/.m2/repository/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> log4j: Trying to find [log4j.xml] using context classloader
> org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d.
> log4j: Trying to find [log4j.xml] using
> org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d
> class loader.
> log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource().
> log4j: Trying to find [log4j.properties] using context classloader
> org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d.
> log4j: Using URL
>
> [jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties]
> for automatic log4j configuration.
> log4j: Reading configuration from URL
>
> jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties
> log4j: Parsing for [root] with value=[INFO, stdout].
> log4j: Level token is [INFO].
> log4j: Category root set to INFO
> log4j: Parsing appender named "stdout".
> log4j: Parsing layout options for "stdout".
> log4j: Setting property [conversionPattern] to [[%d] %p %m (%c:%L)%n].
> log4j: End of parsing for "stdout".
> log4j: Parsed "stdout" options.
> log4j: Parsing for [org.apache.directory] with value=[ERROR, stdout].
> log4j: Level token is [ERROR].
> log4j: Category org.apache.directory set to ERROR
> log4j: Parsing appender named "stdout".
> log4j: Appender "stdout" was already parsed.
> log4j: Handling log4j.additivity.org.apache.directory=[null]
> log4j: Parsing for [org.apache.zookeeper] with value=[ERROR, stdout].
> log4j: Level token is [ERROR].
> log4j: Category org.apache.zookeeper set to ERROR
> log4j: Parsing appender named "stdout".
> log4j: Appender "stdout" was already parsed.
> log4j: Handling log4j.additivity.org.apache.zookeeper=[null]
> log4j: Parsing for [org.apache.kafka] with value=[ERROR, stdout].
> log4j: Level token is [ERROR].
> log4j: Category org.apache.kafka set to ERROR
> log4j: Parsing appender named "stdout".
> log4j: Appender "stdout" was already parsed.
> log4j: Handling log4j.additivity.org.apache.kafka=[null]
> log4j: Parsing for [kafka] with value=[ERROR, stdout].
> log4j: Level token is [ERROR].
> log4j: Category kafka set to ERROR
> log4j: Parsing appender named "stdout".
> log4j: Appender "stdout" was already parsed.
> log4j: Handling log4j.additivity.kafka=[null]
> log4j: Finished configuring.
> SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
> [2025-05-04 15:37:20,915] INFO KafkaAvroSerializerConfig values:
>     auto.register.schemas = false
>     avro.reflection.allow.null = false
>     avro.remove.java.properties = false
>     avro.use.logical.type.converters = false
>     basic.auth.credentials.source = URL
>     basic.auth.user.info = [hidden]
>     bearer.auth.cache.expiry.buffer.seconds = 300
>     bearer.auth.client.id = null
>     bearer.auth.client.secret = null
>     bearer.auth.credentials.source = USER_INFO
>     bearer.auth.custom.provider.class = null
>     bearer.auth.identity.pool.id = null
>     bearer.auth.issuer.endpoint.url = null
>     bearer.auth.logical.cluster = null
>     bearer.auth.scope = null
>     bearer.auth.scope.claim.name = scope
>     bearer.auth.sub.claim.name = sub
>     bearer.auth.token = [hidden]
>     context.name.strategy = class
> io.confluent.kafka.serializers.context.NullContextNameStrategy
>     http.connect.timeout.ms = 60000
>     http.read.timeout.ms = 60000
>     id.compatibility.strict = true
>     key.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>     latest.cache.size = 1000
>     latest.cache.ttl.sec = -1
>     latest.compatibility.strict = true
>     max.schemas.per.subject = 1000
>     normalize.schemas = false
>     proxy.host =
>     proxy.port = -1
>     rule.actions = []
>     rule.executors = []
>     rule.service.loader.enable = true
>     schema.format = null
>     schema.reflection = false
>     schema.registry.basic.auth.user.info = [hidden]
>     schema.registry.ssl.cipher.suites = null
>     schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>     schema.registry.ssl.endpoint.identification.algorithm = HTTPS
>     schema.registry.ssl.engine.factory.class = null
>     schema.registry.ssl.key.password = null
>     schema.registry.ssl.keymanager.algorithm = SunX509
>     schema.registry.ssl.keystore.certificate.chain = null
>     schema.registry.ssl.keystore.key = null
>     schema.registry.ssl.keystore.location = null
>     schema.registry.ssl.keystore.password = null
>     schema.registry.ssl.keystore.type = JKS
>     schema.registry.ssl.protocol = TLSv1.3
>     schema.registry.ssl.provider = null
>     schema.registry.ssl.secure.random.implementation = null
>     schema.registry.ssl.trustmanager.algorithm = PKIX
>     schema.registry.ssl.truststore.certificates = null
>     schema.registry.ssl.truststore.location =
>
> /Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks
>     schema.registry.ssl.truststore.password = [hidden]
>     schema.registry.ssl.truststore.type = JKS
>     schema.registry.url =
> [https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7]
>     use.latest.version = true
>     use.latest.with.metadata = null
>     use.schema.id = 2
>     value.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.RecordNameStrategy
>  (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:370)
> Producer created successfully. Sending messages to:
> syslog.ueba-nov.v1.nov.nov
> Sending employee records...
> Sending record for John Doe with ID: 1
> [WARNING]
> org.apache.kafka.common.errors.SerializationException: Error
> retrieving schema
>
> ID{"type":"record","name":"Employee","namespace":"com.versa.apicurio.confluent","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"salary","type":["null","float"],"default":null},{"name":"age","type":["null","int"],"default":null},{"name":"department","type":{"type":"enum","name":"DepartmentEnum","symbols":["HR","ENGINEERING","SALES"]}},{"name":"email","type":["null","string"],"default":null},{"name":"new_col","type":["null","string"],"default":null},{"name":"new_col2","type":["null","string"],"default":null},{"name":"new_col3","type":["null","string"],"default":null}]}
>     at
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException
> (AbstractKafkaSchemaSerDe.java:809)
>     at
> io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl
> (AbstractKafkaAvroSerializer.java:176)
>     at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize
> (KafkaAvroSerializer.java:68)
>     at org.apache.kafka.clients.producer.KafkaProducer.doSend
> (KafkaProducer.java:1000)
>     at org.apache.kafka.clients.producer.KafkaProducer.send
> (KafkaProducer.java:947)
>     at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main
> (VersionBasedKafkaProducer.java:507)
>     at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375)
>     at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364)
>     at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0
> (ExecJavaMojo.java:286)
>     at java.lang.Thread.run (Thread.java:833)
> Caused by:
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> Error; error code: 50005
>     at
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest
> (RestService.java:336)
>     at
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest
> (RestService.java:409)
>     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId
> (RestService.java:907)
>     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId
> (RestService.java:880)
>     at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry
> (CachedSchemaRegistryClient.java:333)
>     at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId
> (CachedSchemaRegistryClient.java:464)
>     at
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getSchemaBySubjectAndId
> (AbstractKafkaSchemaSerDe.java:534)
>     at
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupSchemaBySubjectAndId
> (AbstractKafkaSchemaSerDe.java:540)
>     at
> io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl
> (AbstractKafkaAvroSerializer.java:130)
>     at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize
> (KafkaAvroSerializer.java:68)
>     at org.apache.kafka.clients.producer.KafkaProducer.doSend
> (KafkaProducer.java:1000)
>     at org.apache.kafka.clients.producer.KafkaProducer.send
> (KafkaProducer.java:947)
>     at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main
> (VersionBasedKafkaProducer.java:507)
>     at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375)
>     at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364)
>     at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0
> (ExecJavaMojo.java:286)
>     at java.lang.Thread.run (Thread.java:833)
> [INFO]
> ------------------------------------------------------------------------
> [INFO] BUILD FAILURE
>
> ```
>
>
> In function - verifySchemaAndGetId(), i'm able to get the SchemaId & the
> schema details, but when i use the Producer to publish the schema - it is
> failing.
>
> Any idea on this ?
>
> tia!
>
>
> here is the Stackoverflow link ->
>
>
>
> https://stackoverflow.com/questions/79606146/apicurio-schema-registry-3-0-7-producer-failing-in-confluent-compatibility-mod
>

Reply via email to