I have Apicurio Schema Registry 3.0.7 installed on GKE. I'm trying to use
this in Confluent Compatibility mode .. primarily because on production, my
Kafka Producer is C++ client, and Apicurio serde is not supported for C++
client.

Here is the Java code:

```

import com.versa.apicurio.confluent.Employee;
import com.versa.apicurio.confluent.DepartmentEnum;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;


public class VersionBasedKafkaProducer {

    private static final String REGISTRY_URL =
"https://apicurio-sr.vkp.versa-vani.com";;
    private static final String LETS_ENCRYPT_ROOT_URL =
"https://letsencrypt.org/certs/isrgrootx1.pem";;
    private static final String LETS_ENCRYPT_R11_URL =
"https://letsencrypt.org/certs/2024/r11.pem";;
    private static final String TRUSTSTORE_PASSWORD = "changeit";
    private static final String TRUSTSTORE_PATH = "apicurio-truststore.jks";

    /**
     * Creates a custom truststore with Let's Encrypt certificates
     * @return The path to the created truststore
     */
    private static String createCustomTruststore() throws Exception {
        <truncated>
    }

    /**
     * Downloads an X.509 certificate from a URL
     */
    private static X509Certificate downloadCertificate(String certUrl)
throws Exception {
        <truncated>
    }

    /**
     * Get the Access Token for authentication
     */
    public static String getAccessToken(String tokenUrl, String
clientId, String clientSecret) throws Exception {
        String params = "grant_type=client_credentials"
            + "&client_id=" + clientId
            + "&client_secret=" + clientSecret;
        URL url = new URL(tokenUrl);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setRequestMethod("POST");
        conn.setDoOutput(true);
        conn.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
        try (OutputStream os = conn.getOutputStream()) {
            os.write(params.getBytes(StandardCharsets.UTF_8));
        }
        try (Scanner scanner = new Scanner(conn.getInputStream(),
StandardCharsets.UTF_8.name())) {
            String resp = scanner.useDelimiter("\\A").next();
            return resp.split("\"access_token\":\"")[1].split("\"")[0];
        }
    }

    apiv

    /**
 * Verify schema exists and get information about it, returning the schema ID
 */
private static int verifySchemaAndGetId(String token) throws Exception {
    System.out.println("\n=== Verifying schema in registry ===");

    // Subject name with RecordNameStrategy
    String subject = "com.versa.apicurio.confluent.Employee";

    // Check if the subject exists
    String subjectsUrl = String.format("%s/apis/ccompat/v7/subjects",
REGISTRY_URL);
    System.out.println("Checking all subjects at: " + subjectsUrl);

    URL url = new URL(subjectsUrl);
    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    conn.setRequestMethod("GET");
    conn.setRequestProperty("Authorization", "Bearer " + token);

    int responseCode = conn.getResponseCode();
    System.out.println("Response Code: " + responseCode);

    boolean subjectExists = false;

    if (responseCode == 200) {
        try (Scanner scanner = new Scanner(conn.getInputStream(),
StandardCharsets.UTF_8.name())) {
            String resp = scanner.useDelimiter("\\A").next();
            System.out.println("All subjects: " + resp);

            // Check if our subject is in the list
            subjectExists = resp.contains("\"" + subject + "\"");
            System.out.println("Subject '" + subject + "' exists: " +
subjectExists);
        }
    } else {
        System.out.println("Failed to retrieve subjects list. Response
code: " + responseCode);
        return -1;
    }

    if (!subjectExists) {
        System.out.println("Subject '" + subject + "' does not exist
in the registry!");
        return -1;
    }

    // Get versions
    String versionsUrl =
String.format("%s/apis/ccompat/v7/subjects/%s/versions", REGISTRY_URL,
subject);
    System.out.println("Fetching versions at: " + versionsUrl);

    URL versionsUrlObj = new URL(versionsUrl);
    HttpURLConnection versionsConn = (HttpURLConnection)
versionsUrlObj.openConnection();
    versionsConn.setRequestMethod("GET");
    versionsConn.setRequestProperty("Authorization", "Bearer " + token);

    int versionsResponseCode = versionsConn.getResponseCode();
    String highestVersion = "1";

    if (versionsResponseCode == 200) {
        try (Scanner scanner = new
Scanner(versionsConn.getInputStream(), StandardCharsets.UTF_8.name()))
{
            String resp = scanner.useDelimiter("\\A").next();
            System.out.println("Available versions: " + resp);

            if (resp.equals("[]") || resp.trim().isEmpty()) {
                System.out.println("Warning: No versions found for
this subject!");
                return -1;
            } else {
                // Find the highest version number
                Pattern pattern = Pattern.compile("\\d+");
                Matcher matcher = pattern.matcher(resp);
                int maxVersion = 0;
                while (matcher.find()) {
                    try {
                        int version = Integer.parseInt(matcher.group());
                        if (version > maxVersion) {
                            maxVersion = version;
                            highestVersion = matcher.group();
                        }
                    } catch (NumberFormatException e) {
                        // Skip if not a valid number
                    }
                }
                System.out.println("Highest version found: " + highestVersion);
            }
        }
    } else {
        System.out.println("Failed to list versions. Response code: "
+ versionsResponseCode);
        return -1;
    }

    // Get the specific version
    String versionUrl =
String.format("%s/apis/ccompat/v7/subjects/%s/versions/%s",
            REGISTRY_URL, subject, highestVersion);
    System.out.println("Fetching schema at: " + versionUrl);

    URL specificVersionUrl = new URL(versionUrl);
    HttpURLConnection versionConn = (HttpURLConnection)
specificVersionUrl.openConnection();
    versionConn.setRequestMethod("GET");
    versionConn.setRequestProperty("Authorization", "Bearer " + token);

    int versionResponseCode = versionConn.getResponseCode();
    int schemaId = -1;

    if (versionResponseCode == 200) {
        try (Scanner scanner = new
Scanner(versionConn.getInputStream(), StandardCharsets.UTF_8.name()))
{
            String resp = scanner.useDelimiter("\\A").next();
            System.out.println("Version " + highestVersion + " schema
(raw): " + resp);

            // Extract schema ID
            if (resp.contains("\"id\":")) {
                String idStr = resp.split("\"id\":")[1].split(",")[0].trim();
                try {
                    schemaId = Integer.parseInt(idStr);
                    System.out.println("Schema ID: " + schemaId);
                } catch (NumberFormatException e) {
                    System.out.println("Failed to parse schema ID: " + idStr);
                    return -1;
                }
            }

            // Extract and pretty print the schema
            try {
                String schemaJson =
resp.split("\"schema\":\"")[1].split("\",\"schemaType\"")[0];
                // Replace escaped quotes with regular quotes
                schemaJson = schemaJson.replace("\\\"", "\"");
                // Replace escaped backslashes with regular backslashes
                schemaJson = schemaJson.replace("\\\\", "\\");
                System.out.println("\nSchema content (prettified):\n"
+ schemaJson);
            } catch (Exception e) {
                System.out.println("Failed to prettify schema: " +
e.getMessage());
            }
        }
        return schemaId;
    } else {
        System.out.println("Failed to get version. Response code: " +
versionResponseCode);
        return -1;
    }
}

public static void main(String[] args) throws Exception {
    // Create truststore for SSL
    String truststorePath = createCustomTruststore();

    // Get token
    String tokenUrl =
"https://keycloak.vkp.versa-vani.com/realms/readonly-realm/protocol/openid-connect/token";;
    String clientId = "apicurio-registry";
    String clientSecret = "<secret>";
    String token = getAccessToken(tokenUrl, clientId, clientSecret);
    System.out.println("Got access token: " + token.substring(0, 15) + "...");

    // Get schema ID during verification
    int schemaId = verifySchemaAndGetId(token);
    // System.out.println("schemaId",  schemaId);
    if (schemaId <= 0) {
        System.out.println("Failed to get valid schema ID. Exiting.");
        return;
    }

    // Add direct schema ID check
    String schemaIdUrl =
String.format("%s/apis/ccompat/v7/schemas/ids/%d", REGISTRY_URL,
schemaId);
    System.out.println("\n=== Testing direct schema retrieval by ID ===");
    System.out.println("Testing URL: " + schemaIdUrl);

    URL schemaIdUrlObj = new URL(schemaIdUrl);
    HttpURLConnection schemaIdConn = (HttpURLConnection)
schemaIdUrlObj.openConnection();
    schemaIdConn.setRequestMethod("GET");
    schemaIdConn.setRequestProperty("Authorization", "Bearer " + token);

    int schemaIdResponseCode = schemaIdConn.getResponseCode();
    System.out.println("Schema ID check response code: " +
schemaIdResponseCode);

    if (schemaIdResponseCode == 200) {
        try (Scanner scanner = new
Scanner(schemaIdConn.getInputStream(), StandardCharsets.UTF_8.name()))
{
            String resp = scanner.useDelimiter("\\A").next();
            System.out.println("Schema by ID response: " + resp);
        }
    } else {
        System.out.println("Failed to get schema by ID. Response code:
" + schemaIdResponseCode);
    }

    // Set up Kafka producer
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<IP>:<port>");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class.getName());

    // Schema Registry config - Updated with dynamic schema ID
    props.put("schema.registry.url", REGISTRY_URL + "/apis/ccompat/v7");
    props.put("bearer.auth.credentials.source", "USER_INFO");
    props.put("bearer.auth.token", token);
    props.put("value.subject.name.strategy",
"io.confluent.kafka.serializers.subject.RecordNameStrategy");
    props.put("auto.register.schemas", "false");
    props.put("apicurio.registry.as-confluent", "true");
    props.put("apicurio.registry.use-id", "contentId");

    // props.put("value.subject.name.strategy",
"io.confluent.kafka.serializers.subject.TopicNameStrategy");
    // props.put("auto.register.schemas", "true");  //

    // Use the dynamically retrieved schema ID
    props.put("use.schema.id", String.valueOf(schemaId));

    // Additional helpful configuration
    props.put("specific.avro.reader", "true");
    props.put("use.latest.version", "true");

    // SSL settings for Kafka
    props.put("ssl.truststore.location",
"/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc-tf/versa-kafka-poc-tf-cluster-ca-cert/versa-kafka-poc-tf-ca.p12");
    props.put("ssl.truststore.password", "pwd");
    props.put("ssl.truststore.type", "PKCS12");
    props.put("ssl.keystore.location",
"/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc/syslog-vani-prefix/syslog-vani-prefix.p12");
    props.put("ssl.keystore.password", "pwd");
    props.put("ssl.keystore.type", "PKCS12");
    props.put("security.protocol", "SSL");

    // Schema Registry SSL settings
    props.put("schema.registry.ssl.truststore.location", truststorePath);
    props.put("schema.registry.ssl.truststore.password", TRUSTSTORE_PASSWORD);
    props.put("schema.registry.ssl.truststore.type", "JKS");
    props.put("schema.registry.ssl.endpoint.identification.algorithm", "HTTPS");

    props.put("schema.reflection", "false");
    props.put("specific.avro.reader", "true");
    props.put("schema.registry.group.id", "default");

    // Add debugging parameter
    props.put("debug", "all");

    // For Apicurio 3.0.7, schema may be under a group
    props.put("schema.registry.group.id", "default");

    // Enhanced debugging
    
System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.serializers",
"TRACE");
    
System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.schemaregistry",
"TRACE");

    // Using your original topic
    String topic = "<topic>";

    // Update to your main method where you create and send messages
try (Producer<String, Employee> producer = new KafkaProducer<>(props)) {
    System.out.println("Producer created successfully. Sending
messages to: " + topic);

    System.out.println("Sending employee records...");

    // Create employees with null values for new_col2 and new_col3
    Employee[] employees = {
        Employee.newBuilder()
                .setId(1)
                .setName("John Doe")
                .setAge(30)
                .setSalary(75000.0f)
                .setDepartment(DepartmentEnum.ENGINEERING)
                .setEmail(null)
                .setNewCol(null)
                .setNewCol2(null)
                .setNewCol3(null)
                .build(),
        Employee.newBuilder()
                .setId(2)
                .setName("Jane Smith")
                .setAge(28)
                .setSalary(85000.0f)
                .setDepartment(DepartmentEnum.HR)
                .setEmail(null)
                .setNewCol(null)
                .setNewCol2(null)
                .setNewCol3(null)
                .build(),
        Employee.newBuilder()
                .setId(3)
                .setName("Bob Johnson")
                .setSalary(65000.0f)
                .setDepartment(DepartmentEnum.SALES)
                .setAge(null)
                .setEmail(null)
                .setNewCol(null)
                .setNewCol2(null)
                .setNewCol3(null)
                .build()
    };

    // Send each record
    for (Employee emp : employees) {
        System.out.println("Sending record for " + emp.getName() + "
with ID: " + emp.getId());
        ProducerRecord<String, Employee> record = new
ProducerRecord<>(topic, String.valueOf(emp.getId()), emp);

        // Use asynchronous sending with a callback instead of synchronous get()
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    System.err.println("Error sending record for " +
emp.getName() + ":");
                    e.printStackTrace();
                } else {
                    System.out.println("Successfully sent " +
emp.getName() + " to partition " +
                            metadata.partition() + " at offset " +
metadata.offset());
                }
            }
        });

        // Small delay to see logs clearly
        Thread.sleep(100);
    }

    // Ensure all records are sent before closing producer
    producer.flush();
    System.out.println("All messages sent successfully");
}

```

here is the error :

```

(base) Karans-MacBook-Pro:apicurio_confluent karanalang$
(base) Karans-MacBook-Pro:apicurio_confluent karanalang$ mvn exec:java
-Dexec.mainClass="com.versa.apicurio.confluent.VersionBasedKafkaProducer"
-Dlog4j.debug
[INFO] Scanning for projects...
[INFO]
[INFO] ----------< com.versa.apicurio.confluent:apicurio_confluent >-----------
[INFO] Building apicurio_confluent 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- exec-maven-plugin:3.5.0:java (default-cli) @ apicurio_confluent ---
Creating custom truststore with Let's Encrypt certificates...
Downloading Let's Encrypt Root certificate...
Downloading Let's Encrypt R11 certificate...
Custom truststore created at:
/Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks
Got access token: eyJhbGciOiJSUzI...

=== Verifying schema in registry ===
Checking all subjects at:
https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects
Response Code: 200
All subjects: ["com.versa.apicurio.confluent.Employee"]
Subject 'com.versa.apicurio.confluent.Employee' exists: true
Fetching versions at:
https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions
Available versions: [1,2,3,4]
Highest version found: 4
Fetching schema at:
https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions/4
Version 4 schema (raw):
{"subject":"com.versa.apicurio.confluent.Employee","version":4,"id":2,"schema":"{\"type\":
\"record\", \"name\": \"Employee\", \"namespace\":
\"com.versa.apicurio.confluent\", \"fields\": [{\"name\": \"id\",
\"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"},
{\"name\": \"salary\", \"type\": [\"null\", \"float\"], \"default\":
null}, {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\":
null}, {\"name\": \"department\", \"type\": {\"type\": \"enum\",
\"name\": \"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\",
\"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"],
\"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\",
\"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\":
[\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\",
\"type\": [\"null\", \"string\"], \"default\":
null}]}","schemaType":"AVRO","references":[]}
Schema ID: 2

Schema content (prettified):
{"type": "record", "name": "Employee", "namespace":
"com.versa.apicurio.confluent", "fields": [{"name": "id", "type":
"int"}, {"name": "name", "type": "string"}, {"name": "salary", "type":
["null", "float"], "default": null}, {"name": "age", "type": ["null",
"int"], "default": null}, {"name": "department", "type": {"type":
"enum", "name": "DepartmentEnum", "symbols": ["HR", "ENGINEERING",
"SALES"]}}, {"name": "email", "type": ["null", "string"], "default":
null}, {"name": "new_col", "type": ["null", "string"], "default":
null}, {"name": "new_col2", "type": ["null", "string"], "default":
null}, {"name": "new_col3", "type": ["null", "string"], "default":
null}]}

=== Testing direct schema retrieval by ID ===
Testing URL: 
https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/schemas/ids/2
Schema ID check response code: 200
Schema by ID response: {"schema":"{\"type\": \"record\", \"name\":
\"Employee\", \"namespace\": \"com.versa.apicurio.confluent\",
\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\":
\"name\", \"type\": \"string\"}, {\"name\": \"salary\", \"type\":
[\"null\", \"float\"], \"default\": null}, {\"name\": \"age\",
\"type\": [\"null\", \"int\"], \"default\": null}, {\"name\":
\"department\", \"type\": {\"type\": \"enum\", \"name\":
\"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\",
\"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"],
\"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\",
\"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\":
[\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\",
\"type\": [\"null\", \"string\"], \"default\":
null}]}","schemaType":"AVRO","references":[]}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/karanalang/.m2/repository/org/slf4j/slf4j-reload4j/1.7.36/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/Users/karanalang/.m2/repository/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
log4j: Trying to find [log4j.xml] using context classloader
org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d.
log4j: Trying to find [log4j.xml] using
org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d
class loader.
log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource().
log4j: Trying to find [log4j.properties] using context classloader
org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d.
log4j: Using URL
[jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties]
for automatic log4j configuration.
log4j: Reading configuration from URL
jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties
log4j: Parsing for [root] with value=[INFO, stdout].
log4j: Level token is [INFO].
log4j: Category root set to INFO
log4j: Parsing appender named "stdout".
log4j: Parsing layout options for "stdout".
log4j: Setting property [conversionPattern] to [[%d] %p %m (%c:%L)%n].
log4j: End of parsing for "stdout".
log4j: Parsed "stdout" options.
log4j: Parsing for [org.apache.directory] with value=[ERROR, stdout].
log4j: Level token is [ERROR].
log4j: Category org.apache.directory set to ERROR
log4j: Parsing appender named "stdout".
log4j: Appender "stdout" was already parsed.
log4j: Handling log4j.additivity.org.apache.directory=[null]
log4j: Parsing for [org.apache.zookeeper] with value=[ERROR, stdout].
log4j: Level token is [ERROR].
log4j: Category org.apache.zookeeper set to ERROR
log4j: Parsing appender named "stdout".
log4j: Appender "stdout" was already parsed.
log4j: Handling log4j.additivity.org.apache.zookeeper=[null]
log4j: Parsing for [org.apache.kafka] with value=[ERROR, stdout].
log4j: Level token is [ERROR].
log4j: Category org.apache.kafka set to ERROR
log4j: Parsing appender named "stdout".
log4j: Appender "stdout" was already parsed.
log4j: Handling log4j.additivity.org.apache.kafka=[null]
log4j: Parsing for [kafka] with value=[ERROR, stdout].
log4j: Level token is [ERROR].
log4j: Category kafka set to ERROR
log4j: Parsing appender named "stdout".
log4j: Appender "stdout" was already parsed.
log4j: Handling log4j.additivity.kafka=[null]
log4j: Finished configuring.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
[2025-05-04 15:37:20,915] INFO KafkaAvroSerializerConfig values:
    auto.register.schemas = false
    avro.reflection.allow.null = false
    avro.remove.java.properties = false
    avro.use.logical.type.converters = false
    basic.auth.credentials.source = URL
    basic.auth.user.info = [hidden]
    bearer.auth.cache.expiry.buffer.seconds = 300
    bearer.auth.client.id = null
    bearer.auth.client.secret = null
    bearer.auth.credentials.source = USER_INFO
    bearer.auth.custom.provider.class = null
    bearer.auth.identity.pool.id = null
    bearer.auth.issuer.endpoint.url = null
    bearer.auth.logical.cluster = null
    bearer.auth.scope = null
    bearer.auth.scope.claim.name = scope
    bearer.auth.sub.claim.name = sub
    bearer.auth.token = [hidden]
    context.name.strategy = class
io.confluent.kafka.serializers.context.NullContextNameStrategy
    http.connect.timeout.ms = 60000
    http.read.timeout.ms = 60000
    id.compatibility.strict = true
    key.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
    latest.cache.size = 1000
    latest.cache.ttl.sec = -1
    latest.compatibility.strict = true
    max.schemas.per.subject = 1000
    normalize.schemas = false
    proxy.host =
    proxy.port = -1
    rule.actions = []
    rule.executors = []
    rule.service.loader.enable = true
    schema.format = null
    schema.reflection = false
    schema.registry.basic.auth.user.info = [hidden]
    schema.registry.ssl.cipher.suites = null
    schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    schema.registry.ssl.endpoint.identification.algorithm = HTTPS
    schema.registry.ssl.engine.factory.class = null
    schema.registry.ssl.key.password = null
    schema.registry.ssl.keymanager.algorithm = SunX509
    schema.registry.ssl.keystore.certificate.chain = null
    schema.registry.ssl.keystore.key = null
    schema.registry.ssl.keystore.location = null
    schema.registry.ssl.keystore.password = null
    schema.registry.ssl.keystore.type = JKS
    schema.registry.ssl.protocol = TLSv1.3
    schema.registry.ssl.provider = null
    schema.registry.ssl.secure.random.implementation = null
    schema.registry.ssl.trustmanager.algorithm = PKIX
    schema.registry.ssl.truststore.certificates = null
    schema.registry.ssl.truststore.location =
/Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks
    schema.registry.ssl.truststore.password = [hidden]
    schema.registry.ssl.truststore.type = JKS
    schema.registry.url =
[https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7]
    use.latest.version = true
    use.latest.with.metadata = null
    use.schema.id = 2
    value.subject.name.strategy = class
io.confluent.kafka.serializers.subject.RecordNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:370)
Producer created successfully. Sending messages to: syslog.ueba-nov.v1.nov.nov
Sending employee records...
Sending record for John Doe with ID: 1
[WARNING]
org.apache.kafka.common.errors.SerializationException: Error
retrieving schema
ID{"type":"record","name":"Employee","namespace":"com.versa.apicurio.confluent","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"salary","type":["null","float"],"default":null},{"name":"age","type":["null","int"],"default":null},{"name":"department","type":{"type":"enum","name":"DepartmentEnum","symbols":["HR","ENGINEERING","SALES"]}},{"name":"email","type":["null","string"],"default":null},{"name":"new_col","type":["null","string"],"default":null},{"name":"new_col2","type":["null","string"],"default":null},{"name":"new_col3","type":["null","string"],"default":null}]}
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException
(AbstractKafkaSchemaSerDe.java:809)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl
(AbstractKafkaAvroSerializer.java:176)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize
(KafkaAvroSerializer.java:68)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend
(KafkaProducer.java:1000)
    at org.apache.kafka.clients.producer.KafkaProducer.send
(KafkaProducer.java:947)
    at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main
(VersionBasedKafkaProducer.java:507)
    at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375)
    at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364)
    at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0
(ExecJavaMojo.java:286)
    at java.lang.Thread.run (Thread.java:833)
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Error; error code: 50005
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest
(RestService.java:336)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest
(RestService.java:409)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId
(RestService.java:907)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId
(RestService.java:880)
    at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry
(CachedSchemaRegistryClient.java:333)
    at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId
(CachedSchemaRegistryClient.java:464)
    at 
io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getSchemaBySubjectAndId
(AbstractKafkaSchemaSerDe.java:534)
    at 
io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupSchemaBySubjectAndId
(AbstractKafkaSchemaSerDe.java:540)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl
(AbstractKafkaAvroSerializer.java:130)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize
(KafkaAvroSerializer.java:68)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend
(KafkaProducer.java:1000)
    at org.apache.kafka.clients.producer.KafkaProducer.send
(KafkaProducer.java:947)
    at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main
(VersionBasedKafkaProducer.java:507)
    at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375)
    at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364)
    at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0
(ExecJavaMojo.java:286)
    at java.lang.Thread.run (Thread.java:833)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE

```


In function - verifySchemaAndGetId(), i'm able to get the SchemaId & the
schema details, but when i use the Producer to publish the schema - it is
failing.

Any idea on this ?

tia!


here is the Stackoverflow link ->


https://stackoverflow.com/questions/79606146/apicurio-schema-registry-3-0-7-producer-failing-in-confluent-compatibility-mod

Reply via email to