I have Apicurio Schema Registry 3.0.7 installed on GKE. I'm trying to use this in Confluent Compatibility mode .. primarily because on production, my Kafka Producer is C++ client, and Apicurio serde is not supported for C++ client.
Here is the Java code: ``` import com.versa.apicurio.confluent.Employee; import com.versa.apicurio.confluent.DepartmentEnum; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class VersionBasedKafkaProducer { private static final String REGISTRY_URL = "https://apicurio-sr.vkp.versa-vani.com"; private static final String LETS_ENCRYPT_ROOT_URL = "https://letsencrypt.org/certs/isrgrootx1.pem"; private static final String LETS_ENCRYPT_R11_URL = "https://letsencrypt.org/certs/2024/r11.pem"; private static final String TRUSTSTORE_PASSWORD = "changeit"; private static final String TRUSTSTORE_PATH = "apicurio-truststore.jks"; /** * Creates a custom truststore with Let's Encrypt certificates * @return The path to the created truststore */ private static String createCustomTruststore() throws Exception { <truncated> } /** * Downloads an X.509 certificate from a URL */ private static X509Certificate downloadCertificate(String certUrl) throws Exception { <truncated> } /** * Get the Access Token for authentication */ public static String getAccessToken(String tokenUrl, String clientId, String clientSecret) throws Exception { String params = "grant_type=client_credentials" + "&client_id=" + clientId + "&client_secret=" + clientSecret; URL url = new URL(tokenUrl); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("POST"); conn.setDoOutput(true); conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); try (OutputStream os = conn.getOutputStream()) { os.write(params.getBytes(StandardCharsets.UTF_8)); } try (Scanner scanner = new Scanner(conn.getInputStream(), StandardCharsets.UTF_8.name())) { String resp = scanner.useDelimiter("\\A").next(); return resp.split("\"access_token\":\"")[1].split("\"")[0]; } } apiv /** * Verify schema exists and get information about it, returning the schema ID */ private static int verifySchemaAndGetId(String token) throws Exception { System.out.println("\n=== Verifying schema in registry ==="); // Subject name with RecordNameStrategy String subject = "com.versa.apicurio.confluent.Employee"; // Check if the subject exists String subjectsUrl = String.format("%s/apis/ccompat/v7/subjects", REGISTRY_URL); System.out.println("Checking all subjects at: " + subjectsUrl); URL url = new URL(subjectsUrl); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("GET"); conn.setRequestProperty("Authorization", "Bearer " + token); int responseCode = conn.getResponseCode(); System.out.println("Response Code: " + responseCode); boolean subjectExists = false; if (responseCode == 200) { try (Scanner scanner = new Scanner(conn.getInputStream(), StandardCharsets.UTF_8.name())) { String resp = scanner.useDelimiter("\\A").next(); System.out.println("All subjects: " + resp); // Check if our subject is in the list subjectExists = resp.contains("\"" + subject + "\""); System.out.println("Subject '" + subject + "' exists: " + subjectExists); } } else { System.out.println("Failed to retrieve subjects list. Response code: " + responseCode); return -1; } if (!subjectExists) { System.out.println("Subject '" + subject + "' does not exist in the registry!"); return -1; } // Get versions String versionsUrl = String.format("%s/apis/ccompat/v7/subjects/%s/versions", REGISTRY_URL, subject); System.out.println("Fetching versions at: " + versionsUrl); URL versionsUrlObj = new URL(versionsUrl); HttpURLConnection versionsConn = (HttpURLConnection) versionsUrlObj.openConnection(); versionsConn.setRequestMethod("GET"); versionsConn.setRequestProperty("Authorization", "Bearer " + token); int versionsResponseCode = versionsConn.getResponseCode(); String highestVersion = "1"; if (versionsResponseCode == 200) { try (Scanner scanner = new Scanner(versionsConn.getInputStream(), StandardCharsets.UTF_8.name())) { String resp = scanner.useDelimiter("\\A").next(); System.out.println("Available versions: " + resp); if (resp.equals("[]") || resp.trim().isEmpty()) { System.out.println("Warning: No versions found for this subject!"); return -1; } else { // Find the highest version number Pattern pattern = Pattern.compile("\\d+"); Matcher matcher = pattern.matcher(resp); int maxVersion = 0; while (matcher.find()) { try { int version = Integer.parseInt(matcher.group()); if (version > maxVersion) { maxVersion = version; highestVersion = matcher.group(); } } catch (NumberFormatException e) { // Skip if not a valid number } } System.out.println("Highest version found: " + highestVersion); } } } else { System.out.println("Failed to list versions. Response code: " + versionsResponseCode); return -1; } // Get the specific version String versionUrl = String.format("%s/apis/ccompat/v7/subjects/%s/versions/%s", REGISTRY_URL, subject, highestVersion); System.out.println("Fetching schema at: " + versionUrl); URL specificVersionUrl = new URL(versionUrl); HttpURLConnection versionConn = (HttpURLConnection) specificVersionUrl.openConnection(); versionConn.setRequestMethod("GET"); versionConn.setRequestProperty("Authorization", "Bearer " + token); int versionResponseCode = versionConn.getResponseCode(); int schemaId = -1; if (versionResponseCode == 200) { try (Scanner scanner = new Scanner(versionConn.getInputStream(), StandardCharsets.UTF_8.name())) { String resp = scanner.useDelimiter("\\A").next(); System.out.println("Version " + highestVersion + " schema (raw): " + resp); // Extract schema ID if (resp.contains("\"id\":")) { String idStr = resp.split("\"id\":")[1].split(",")[0].trim(); try { schemaId = Integer.parseInt(idStr); System.out.println("Schema ID: " + schemaId); } catch (NumberFormatException e) { System.out.println("Failed to parse schema ID: " + idStr); return -1; } } // Extract and pretty print the schema try { String schemaJson = resp.split("\"schema\":\"")[1].split("\",\"schemaType\"")[0]; // Replace escaped quotes with regular quotes schemaJson = schemaJson.replace("\\\"", "\""); // Replace escaped backslashes with regular backslashes schemaJson = schemaJson.replace("\\\\", "\\"); System.out.println("\nSchema content (prettified):\n" + schemaJson); } catch (Exception e) { System.out.println("Failed to prettify schema: " + e.getMessage()); } } return schemaId; } else { System.out.println("Failed to get version. Response code: " + versionResponseCode); return -1; } } public static void main(String[] args) throws Exception { // Create truststore for SSL String truststorePath = createCustomTruststore(); // Get token String tokenUrl = "https://keycloak.vkp.versa-vani.com/realms/readonly-realm/protocol/openid-connect/token"; String clientId = "apicurio-registry"; String clientSecret = "<secret>"; String token = getAccessToken(tokenUrl, clientId, clientSecret); System.out.println("Got access token: " + token.substring(0, 15) + "..."); // Get schema ID during verification int schemaId = verifySchemaAndGetId(token); // System.out.println("schemaId", schemaId); if (schemaId <= 0) { System.out.println("Failed to get valid schema ID. Exiting."); return; } // Add direct schema ID check String schemaIdUrl = String.format("%s/apis/ccompat/v7/schemas/ids/%d", REGISTRY_URL, schemaId); System.out.println("\n=== Testing direct schema retrieval by ID ==="); System.out.println("Testing URL: " + schemaIdUrl); URL schemaIdUrlObj = new URL(schemaIdUrl); HttpURLConnection schemaIdConn = (HttpURLConnection) schemaIdUrlObj.openConnection(); schemaIdConn.setRequestMethod("GET"); schemaIdConn.setRequestProperty("Authorization", "Bearer " + token); int schemaIdResponseCode = schemaIdConn.getResponseCode(); System.out.println("Schema ID check response code: " + schemaIdResponseCode); if (schemaIdResponseCode == 200) { try (Scanner scanner = new Scanner(schemaIdConn.getInputStream(), StandardCharsets.UTF_8.name())) { String resp = scanner.useDelimiter("\\A").next(); System.out.println("Schema by ID response: " + resp); } } else { System.out.println("Failed to get schema by ID. Response code: " + schemaIdResponseCode); } // Set up Kafka producer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<IP>:<port>"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); // Schema Registry config - Updated with dynamic schema ID props.put("schema.registry.url", REGISTRY_URL + "/apis/ccompat/v7"); props.put("bearer.auth.credentials.source", "USER_INFO"); props.put("bearer.auth.token", token); props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.RecordNameStrategy"); props.put("auto.register.schemas", "false"); props.put("apicurio.registry.as-confluent", "true"); props.put("apicurio.registry.use-id", "contentId"); // props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicNameStrategy"); // props.put("auto.register.schemas", "true"); // // Use the dynamically retrieved schema ID props.put("use.schema.id", String.valueOf(schemaId)); // Additional helpful configuration props.put("specific.avro.reader", "true"); props.put("use.latest.version", "true"); // SSL settings for Kafka props.put("ssl.truststore.location", "/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc-tf/versa-kafka-poc-tf-cluster-ca-cert/versa-kafka-poc-tf-ca.p12"); props.put("ssl.truststore.password", "pwd"); props.put("ssl.truststore.type", "PKCS12"); props.put("ssl.keystore.location", "/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc/syslog-vani-prefix/syslog-vani-prefix.p12"); props.put("ssl.keystore.password", "pwd"); props.put("ssl.keystore.type", "PKCS12"); props.put("security.protocol", "SSL"); // Schema Registry SSL settings props.put("schema.registry.ssl.truststore.location", truststorePath); props.put("schema.registry.ssl.truststore.password", TRUSTSTORE_PASSWORD); props.put("schema.registry.ssl.truststore.type", "JKS"); props.put("schema.registry.ssl.endpoint.identification.algorithm", "HTTPS"); props.put("schema.reflection", "false"); props.put("specific.avro.reader", "true"); props.put("schema.registry.group.id", "default"); // Add debugging parameter props.put("debug", "all"); // For Apicurio 3.0.7, schema may be under a group props.put("schema.registry.group.id", "default"); // Enhanced debugging System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.serializers", "TRACE"); System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.schemaregistry", "TRACE"); // Using your original topic String topic = "<topic>"; // Update to your main method where you create and send messages try (Producer<String, Employee> producer = new KafkaProducer<>(props)) { System.out.println("Producer created successfully. Sending messages to: " + topic); System.out.println("Sending employee records..."); // Create employees with null values for new_col2 and new_col3 Employee[] employees = { Employee.newBuilder() .setId(1) .setName("John Doe") .setAge(30) .setSalary(75000.0f) .setDepartment(DepartmentEnum.ENGINEERING) .setEmail(null) .setNewCol(null) .setNewCol2(null) .setNewCol3(null) .build(), Employee.newBuilder() .setId(2) .setName("Jane Smith") .setAge(28) .setSalary(85000.0f) .setDepartment(DepartmentEnum.HR) .setEmail(null) .setNewCol(null) .setNewCol2(null) .setNewCol3(null) .build(), Employee.newBuilder() .setId(3) .setName("Bob Johnson") .setSalary(65000.0f) .setDepartment(DepartmentEnum.SALES) .setAge(null) .setEmail(null) .setNewCol(null) .setNewCol2(null) .setNewCol3(null) .build() }; // Send each record for (Employee emp : employees) { System.out.println("Sending record for " + emp.getName() + " with ID: " + emp.getId()); ProducerRecord<String, Employee> record = new ProducerRecord<>(topic, String.valueOf(emp.getId()), emp); // Use asynchronous sending with a callback instead of synchronous get() producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { System.err.println("Error sending record for " + emp.getName() + ":"); e.printStackTrace(); } else { System.out.println("Successfully sent " + emp.getName() + " to partition " + metadata.partition() + " at offset " + metadata.offset()); } } }); // Small delay to see logs clearly Thread.sleep(100); } // Ensure all records are sent before closing producer producer.flush(); System.out.println("All messages sent successfully"); } ``` here is the error : ``` (base) Karans-MacBook-Pro:apicurio_confluent karanalang$ (base) Karans-MacBook-Pro:apicurio_confluent karanalang$ mvn exec:java -Dexec.mainClass="com.versa.apicurio.confluent.VersionBasedKafkaProducer" -Dlog4j.debug [INFO] Scanning for projects... [INFO] [INFO] ----------< com.versa.apicurio.confluent:apicurio_confluent >----------- [INFO] Building apicurio_confluent 1.0-SNAPSHOT [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.5.0:java (default-cli) @ apicurio_confluent --- Creating custom truststore with Let's Encrypt certificates... Downloading Let's Encrypt Root certificate... Downloading Let's Encrypt R11 certificate... Custom truststore created at: /Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks Got access token: eyJhbGciOiJSUzI... === Verifying schema in registry === Checking all subjects at: https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects Response Code: 200 All subjects: ["com.versa.apicurio.confluent.Employee"] Subject 'com.versa.apicurio.confluent.Employee' exists: true Fetching versions at: https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions Available versions: [1,2,3,4] Highest version found: 4 Fetching schema at: https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions/4 Version 4 schema (raw): {"subject":"com.versa.apicurio.confluent.Employee","version":4,"id":2,"schema":"{\"type\": \"record\", \"name\": \"Employee\", \"namespace\": \"com.versa.apicurio.confluent\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"salary\", \"type\": [\"null\", \"float\"], \"default\": null}, {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}, {\"name\": \"department\", \"type\": {\"type\": \"enum\", \"name\": \"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\", \"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\", \"type\": [\"null\", \"string\"], \"default\": null}]}","schemaType":"AVRO","references":[]} Schema ID: 2 Schema content (prettified): {"type": "record", "name": "Employee", "namespace": "com.versa.apicurio.confluent", "fields": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "salary", "type": ["null", "float"], "default": null}, {"name": "age", "type": ["null", "int"], "default": null}, {"name": "department", "type": {"type": "enum", "name": "DepartmentEnum", "symbols": ["HR", "ENGINEERING", "SALES"]}}, {"name": "email", "type": ["null", "string"], "default": null}, {"name": "new_col", "type": ["null", "string"], "default": null}, {"name": "new_col2", "type": ["null", "string"], "default": null}, {"name": "new_col3", "type": ["null", "string"], "default": null}]} === Testing direct schema retrieval by ID === Testing URL: https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/schemas/ids/2 Schema ID check response code: 200 Schema by ID response: {"schema":"{\"type\": \"record\", \"name\": \"Employee\", \"namespace\": \"com.versa.apicurio.confluent\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"salary\", \"type\": [\"null\", \"float\"], \"default\": null}, {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}, {\"name\": \"department\", \"type\": {\"type\": \"enum\", \"name\": \"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\", \"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\", \"type\": [\"null\", \"string\"], \"default\": null}]}","schemaType":"AVRO","references":[]} SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/karanalang/.m2/repository/org/slf4j/slf4j-reload4j/1.7.36/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/karanalang/.m2/repository/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. log4j: Trying to find [log4j.xml] using context classloader org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d. log4j: Trying to find [log4j.xml] using org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d class loader. log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource(). log4j: Trying to find [log4j.properties] using context classloader org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d. log4j: Using URL [jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties] for automatic log4j configuration. log4j: Reading configuration from URL jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties log4j: Parsing for [root] with value=[INFO, stdout]. log4j: Level token is [INFO]. log4j: Category root set to INFO log4j: Parsing appender named "stdout". log4j: Parsing layout options for "stdout". log4j: Setting property [conversionPattern] to [[%d] %p %m (%c:%L)%n]. log4j: End of parsing for "stdout". log4j: Parsed "stdout" options. log4j: Parsing for [org.apache.directory] with value=[ERROR, stdout]. log4j: Level token is [ERROR]. log4j: Category org.apache.directory set to ERROR log4j: Parsing appender named "stdout". log4j: Appender "stdout" was already parsed. log4j: Handling log4j.additivity.org.apache.directory=[null] log4j: Parsing for [org.apache.zookeeper] with value=[ERROR, stdout]. log4j: Level token is [ERROR]. log4j: Category org.apache.zookeeper set to ERROR log4j: Parsing appender named "stdout". log4j: Appender "stdout" was already parsed. log4j: Handling log4j.additivity.org.apache.zookeeper=[null] log4j: Parsing for [org.apache.kafka] with value=[ERROR, stdout]. log4j: Level token is [ERROR]. log4j: Category org.apache.kafka set to ERROR log4j: Parsing appender named "stdout". log4j: Appender "stdout" was already parsed. log4j: Handling log4j.additivity.org.apache.kafka=[null] log4j: Parsing for [kafka] with value=[ERROR, stdout]. log4j: Level token is [ERROR]. log4j: Category kafka set to ERROR log4j: Parsing appender named "stdout". log4j: Appender "stdout" was already parsed. log4j: Handling log4j.additivity.kafka=[null] log4j: Finished configuring. SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] [2025-05-04 15:37:20,915] INFO KafkaAvroSerializerConfig values: auto.register.schemas = false avro.reflection.allow.null = false avro.remove.java.properties = false avro.use.logical.type.converters = false basic.auth.credentials.source = URL basic.auth.user.info = [hidden] bearer.auth.cache.expiry.buffer.seconds = 300 bearer.auth.client.id = null bearer.auth.client.secret = null bearer.auth.credentials.source = USER_INFO bearer.auth.custom.provider.class = null bearer.auth.identity.pool.id = null bearer.auth.issuer.endpoint.url = null bearer.auth.logical.cluster = null bearer.auth.scope = null bearer.auth.scope.claim.name = scope bearer.auth.sub.claim.name = sub bearer.auth.token = [hidden] context.name.strategy = class io.confluent.kafka.serializers.context.NullContextNameStrategy http.connect.timeout.ms = 60000 http.read.timeout.ms = 60000 id.compatibility.strict = true key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy latest.cache.size = 1000 latest.cache.ttl.sec = -1 latest.compatibility.strict = true max.schemas.per.subject = 1000 normalize.schemas = false proxy.host = proxy.port = -1 rule.actions = [] rule.executors = [] rule.service.loader.enable = true schema.format = null schema.reflection = false schema.registry.basic.auth.user.info = [hidden] schema.registry.ssl.cipher.suites = null schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.3] schema.registry.ssl.endpoint.identification.algorithm = HTTPS schema.registry.ssl.engine.factory.class = null schema.registry.ssl.key.password = null schema.registry.ssl.keymanager.algorithm = SunX509 schema.registry.ssl.keystore.certificate.chain = null schema.registry.ssl.keystore.key = null schema.registry.ssl.keystore.location = null schema.registry.ssl.keystore.password = null schema.registry.ssl.keystore.type = JKS schema.registry.ssl.protocol = TLSv1.3 schema.registry.ssl.provider = null schema.registry.ssl.secure.random.implementation = null schema.registry.ssl.trustmanager.algorithm = PKIX schema.registry.ssl.truststore.certificates = null schema.registry.ssl.truststore.location = /Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks schema.registry.ssl.truststore.password = [hidden] schema.registry.ssl.truststore.type = JKS schema.registry.url = [https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7] use.latest.version = true use.latest.with.metadata = null use.schema.id = 2 value.subject.name.strategy = class io.confluent.kafka.serializers.subject.RecordNameStrategy (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:370) Producer created successfully. Sending messages to: syslog.ueba-nov.v1.nov.nov Sending employee records... Sending record for John Doe with ID: 1 [WARNING] org.apache.kafka.common.errors.SerializationException: Error retrieving schema ID{"type":"record","name":"Employee","namespace":"com.versa.apicurio.confluent","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"salary","type":["null","float"],"default":null},{"name":"age","type":["null","int"],"default":null},{"name":"department","type":{"type":"enum","name":"DepartmentEnum","symbols":["HR","ENGINEERING","SALES"]}},{"name":"email","type":["null","string"],"default":null},{"name":"new_col","type":["null","string"],"default":null},{"name":"new_col2","type":["null","string"],"default":null},{"name":"new_col3","type":["null","string"],"default":null}]} at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException (AbstractKafkaSchemaSerDe.java:809) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.java:176) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:68) at org.apache.kafka.clients.producer.KafkaProducer.doSend (KafkaProducer.java:1000) at org.apache.kafka.clients.producer.KafkaProducer.send (KafkaProducer.java:947) at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main (VersionBasedKafkaProducer.java:507) at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375) at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364) at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0 (ExecJavaMojo.java:286) at java.lang.Thread.run (Thread.java:833) Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error; error code: 50005 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest (RestService.java:336) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest (RestService.java:409) at io.confluent.kafka.schemaregistry.client.rest.RestService.getId (RestService.java:907) at io.confluent.kafka.schemaregistry.client.rest.RestService.getId (RestService.java:880) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry (CachedSchemaRegistryClient.java:333) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId (CachedSchemaRegistryClient.java:464) at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getSchemaBySubjectAndId (AbstractKafkaSchemaSerDe.java:534) at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupSchemaBySubjectAndId (AbstractKafkaSchemaSerDe.java:540) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.java:130) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:68) at org.apache.kafka.clients.producer.KafkaProducer.doSend (KafkaProducer.java:1000) at org.apache.kafka.clients.producer.KafkaProducer.send (KafkaProducer.java:947) at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main (VersionBasedKafkaProducer.java:507) at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375) at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364) at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0 (ExecJavaMojo.java:286) at java.lang.Thread.run (Thread.java:833) [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE ``` In function - verifySchemaAndGetId(), i'm able to get the SchemaId & the schema details, but when i use the Producer to publish the schema - it is failing. Any idea on this ? tia! here is the Stackoverflow link -> https://stackoverflow.com/questions/79606146/apicurio-schema-registry-3-0-7-producer-failing-in-confluent-compatibility-mod