Hi Kara, This is obviously not the correct place for this question. I saw you also asked in Apicurio-registry <https://github.com/Apicurio/apicurio-registry/issues/6189> repo <https://github.com/Apicurio/apicurio-registry/issues/6189>. Please wait for their response there.
Thanks. Luke On Mon, May 5, 2025 at 7:18 AM karan alang <karan.al...@gmail.com> wrote: > I have Apicurio Schema Registry 3.0.7 installed on GKE. I'm trying to use > this in Confluent Compatibility mode .. primarily because on production, my > Kafka Producer is C++ client, and Apicurio serde is not supported for C++ > client. > > Here is the Java code: > > ``` > > import com.versa.apicurio.confluent.Employee; > import com.versa.apicurio.confluent.DepartmentEnum; > import org.apache.kafka.clients.producer.*; > import org.apache.kafka.common.serialization.StringSerializer; > import io.confluent.kafka.serializers.KafkaAvroSerializer; > > > public class VersionBasedKafkaProducer { > > private static final String REGISTRY_URL = > "https://apicurio-sr.vkp.versa-vani.com"; > private static final String LETS_ENCRYPT_ROOT_URL = > "https://letsencrypt.org/certs/isrgrootx1.pem"; > private static final String LETS_ENCRYPT_R11_URL = > "https://letsencrypt.org/certs/2024/r11.pem"; > private static final String TRUSTSTORE_PASSWORD = "changeit"; > private static final String TRUSTSTORE_PATH = > "apicurio-truststore.jks"; > > /** > * Creates a custom truststore with Let's Encrypt certificates > * @return The path to the created truststore > */ > private static String createCustomTruststore() throws Exception { > <truncated> > } > > /** > * Downloads an X.509 certificate from a URL > */ > private static X509Certificate downloadCertificate(String certUrl) > throws Exception { > <truncated> > } > > /** > * Get the Access Token for authentication > */ > public static String getAccessToken(String tokenUrl, String > clientId, String clientSecret) throws Exception { > String params = "grant_type=client_credentials" > + "&client_id=" + clientId > + "&client_secret=" + clientSecret; > URL url = new URL(tokenUrl); > HttpURLConnection conn = (HttpURLConnection) url.openConnection(); > conn.setRequestMethod("POST"); > conn.setDoOutput(true); > conn.setRequestProperty("Content-Type", > "application/x-www-form-urlencoded"); > try (OutputStream os = conn.getOutputStream()) { > os.write(params.getBytes(StandardCharsets.UTF_8)); > } > try (Scanner scanner = new Scanner(conn.getInputStream(), > StandardCharsets.UTF_8.name())) { > String resp = scanner.useDelimiter("\\A").next(); > return resp.split("\"access_token\":\"")[1].split("\"")[0]; > } > } > > apiv > > /** > * Verify schema exists and get information about it, returning the schema > ID > */ > private static int verifySchemaAndGetId(String token) throws Exception { > System.out.println("\n=== Verifying schema in registry ==="); > > // Subject name with RecordNameStrategy > String subject = "com.versa.apicurio.confluent.Employee"; > > // Check if the subject exists > String subjectsUrl = String.format("%s/apis/ccompat/v7/subjects", > REGISTRY_URL); > System.out.println("Checking all subjects at: " + subjectsUrl); > > URL url = new URL(subjectsUrl); > HttpURLConnection conn = (HttpURLConnection) url.openConnection(); > conn.setRequestMethod("GET"); > conn.setRequestProperty("Authorization", "Bearer " + token); > > int responseCode = conn.getResponseCode(); > System.out.println("Response Code: " + responseCode); > > boolean subjectExists = false; > > if (responseCode == 200) { > try (Scanner scanner = new Scanner(conn.getInputStream(), > StandardCharsets.UTF_8.name())) { > String resp = scanner.useDelimiter("\\A").next(); > System.out.println("All subjects: " + resp); > > // Check if our subject is in the list > subjectExists = resp.contains("\"" + subject + "\""); > System.out.println("Subject '" + subject + "' exists: " + > subjectExists); > } > } else { > System.out.println("Failed to retrieve subjects list. Response > code: " + responseCode); > return -1; > } > > if (!subjectExists) { > System.out.println("Subject '" + subject + "' does not exist > in the registry!"); > return -1; > } > > // Get versions > String versionsUrl = > String.format("%s/apis/ccompat/v7/subjects/%s/versions", REGISTRY_URL, > subject); > System.out.println("Fetching versions at: " + versionsUrl); > > URL versionsUrlObj = new URL(versionsUrl); > HttpURLConnection versionsConn = (HttpURLConnection) > versionsUrlObj.openConnection(); > versionsConn.setRequestMethod("GET"); > versionsConn.setRequestProperty("Authorization", "Bearer " + token); > > int versionsResponseCode = versionsConn.getResponseCode(); > String highestVersion = "1"; > > if (versionsResponseCode == 200) { > try (Scanner scanner = new > Scanner(versionsConn.getInputStream(), StandardCharsets.UTF_8.name())) > { > String resp = scanner.useDelimiter("\\A").next(); > System.out.println("Available versions: " + resp); > > if (resp.equals("[]") || resp.trim().isEmpty()) { > System.out.println("Warning: No versions found for > this subject!"); > return -1; > } else { > // Find the highest version number > Pattern pattern = Pattern.compile("\\d+"); > Matcher matcher = pattern.matcher(resp); > int maxVersion = 0; > while (matcher.find()) { > try { > int version = Integer.parseInt(matcher.group()); > if (version > maxVersion) { > maxVersion = version; > highestVersion = matcher.group(); > } > } catch (NumberFormatException e) { > // Skip if not a valid number > } > } > System.out.println("Highest version found: " + > highestVersion); > } > } > } else { > System.out.println("Failed to list versions. Response code: " > + versionsResponseCode); > return -1; > } > > // Get the specific version > String versionUrl = > String.format("%s/apis/ccompat/v7/subjects/%s/versions/%s", > REGISTRY_URL, subject, highestVersion); > System.out.println("Fetching schema at: " + versionUrl); > > URL specificVersionUrl = new URL(versionUrl); > HttpURLConnection versionConn = (HttpURLConnection) > specificVersionUrl.openConnection(); > versionConn.setRequestMethod("GET"); > versionConn.setRequestProperty("Authorization", "Bearer " + token); > > int versionResponseCode = versionConn.getResponseCode(); > int schemaId = -1; > > if (versionResponseCode == 200) { > try (Scanner scanner = new > Scanner(versionConn.getInputStream(), StandardCharsets.UTF_8.name())) > { > String resp = scanner.useDelimiter("\\A").next(); > System.out.println("Version " + highestVersion + " schema > (raw): " + resp); > > // Extract schema ID > if (resp.contains("\"id\":")) { > String idStr = > resp.split("\"id\":")[1].split(",")[0].trim(); > try { > schemaId = Integer.parseInt(idStr); > System.out.println("Schema ID: " + schemaId); > } catch (NumberFormatException e) { > System.out.println("Failed to parse schema ID: " + > idStr); > return -1; > } > } > > // Extract and pretty print the schema > try { > String schemaJson = > resp.split("\"schema\":\"")[1].split("\",\"schemaType\"")[0]; > // Replace escaped quotes with regular quotes > schemaJson = schemaJson.replace("\\\"", "\""); > // Replace escaped backslashes with regular backslashes > schemaJson = schemaJson.replace("\\\\", "\\"); > System.out.println("\nSchema content (prettified):\n" > + schemaJson); > } catch (Exception e) { > System.out.println("Failed to prettify schema: " + > e.getMessage()); > } > } > return schemaId; > } else { > System.out.println("Failed to get version. Response code: " + > versionResponseCode); > return -1; > } > } > > public static void main(String[] args) throws Exception { > // Create truststore for SSL > String truststorePath = createCustomTruststore(); > > // Get token > String tokenUrl = > " > https://keycloak.vkp.versa-vani.com/realms/readonly-realm/protocol/openid-connect/token > "; > String clientId = "apicurio-registry"; > String clientSecret = "<secret>"; > String token = getAccessToken(tokenUrl, clientId, clientSecret); > System.out.println("Got access token: " + token.substring(0, 15) + > "..."); > > // Get schema ID during verification > int schemaId = verifySchemaAndGetId(token); > // System.out.println("schemaId", schemaId); > if (schemaId <= 0) { > System.out.println("Failed to get valid schema ID. Exiting."); > return; > } > > // Add direct schema ID check > String schemaIdUrl = > String.format("%s/apis/ccompat/v7/schemas/ids/%d", REGISTRY_URL, > schemaId); > System.out.println("\n=== Testing direct schema retrieval by ID ==="); > System.out.println("Testing URL: " + schemaIdUrl); > > URL schemaIdUrlObj = new URL(schemaIdUrl); > HttpURLConnection schemaIdConn = (HttpURLConnection) > schemaIdUrlObj.openConnection(); > schemaIdConn.setRequestMethod("GET"); > schemaIdConn.setRequestProperty("Authorization", "Bearer " + token); > > int schemaIdResponseCode = schemaIdConn.getResponseCode(); > System.out.println("Schema ID check response code: " + > schemaIdResponseCode); > > if (schemaIdResponseCode == 200) { > try (Scanner scanner = new > Scanner(schemaIdConn.getInputStream(), StandardCharsets.UTF_8.name())) > { > String resp = scanner.useDelimiter("\\A").next(); > System.out.println("Schema by ID response: " + resp); > } > } else { > System.out.println("Failed to get schema by ID. Response code: > " + schemaIdResponseCode); > } > > // Set up Kafka producer > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<IP>:<port>"); > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > KafkaAvroSerializer.class.getName()); > > // Schema Registry config - Updated with dynamic schema ID > props.put("schema.registry.url", REGISTRY_URL + "/apis/ccompat/v7"); > props.put("bearer.auth.credentials.source", "USER_INFO"); > props.put("bearer.auth.token", token); > props.put("value.subject.name.strategy", > "io.confluent.kafka.serializers.subject.RecordNameStrategy"); > props.put("auto.register.schemas", "false"); > props.put("apicurio.registry.as-confluent", "true"); > props.put("apicurio.registry.use-id", "contentId"); > > // props.put("value.subject.name.strategy", > "io.confluent.kafka.serializers.subject.TopicNameStrategy"); > // props.put("auto.register.schemas", "true"); // > > // Use the dynamically retrieved schema ID > props.put("use.schema.id", String.valueOf(schemaId)); > > // Additional helpful configuration > props.put("specific.avro.reader", "true"); > props.put("use.latest.version", "true"); > > // SSL settings for Kafka > props.put("ssl.truststore.location", > > "/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc-tf/versa-kafka-poc-tf-cluster-ca-cert/versa-kafka-poc-tf-ca.p12"); > props.put("ssl.truststore.password", "pwd"); > props.put("ssl.truststore.type", "PKCS12"); > props.put("ssl.keystore.location", > > "/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc/syslog-vani-prefix/syslog-vani-prefix.p12"); > props.put("ssl.keystore.password", "pwd"); > props.put("ssl.keystore.type", "PKCS12"); > props.put("security.protocol", "SSL"); > > // Schema Registry SSL settings > props.put("schema.registry.ssl.truststore.location", truststorePath); > props.put("schema.registry.ssl.truststore.password", > TRUSTSTORE_PASSWORD); > props.put("schema.registry.ssl.truststore.type", "JKS"); > props.put("schema.registry.ssl.endpoint.identification.algorithm", > "HTTPS"); > > props.put("schema.reflection", "false"); > props.put("specific.avro.reader", "true"); > props.put("schema.registry.group.id", "default"); > > // Add debugging parameter > props.put("debug", "all"); > > // For Apicurio 3.0.7, schema may be under a group > props.put("schema.registry.group.id", "default"); > > // Enhanced debugging > > System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.serializers", > "TRACE"); > > System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.schemaregistry", > "TRACE"); > > // Using your original topic > String topic = "<topic>"; > > // Update to your main method where you create and send messages > try (Producer<String, Employee> producer = new KafkaProducer<>(props)) { > System.out.println("Producer created successfully. Sending > messages to: " + topic); > > System.out.println("Sending employee records..."); > > // Create employees with null values for new_col2 and new_col3 > Employee[] employees = { > Employee.newBuilder() > .setId(1) > .setName("John Doe") > .setAge(30) > .setSalary(75000.0f) > .setDepartment(DepartmentEnum.ENGINEERING) > .setEmail(null) > .setNewCol(null) > .setNewCol2(null) > .setNewCol3(null) > .build(), > Employee.newBuilder() > .setId(2) > .setName("Jane Smith") > .setAge(28) > .setSalary(85000.0f) > .setDepartment(DepartmentEnum.HR) > .setEmail(null) > .setNewCol(null) > .setNewCol2(null) > .setNewCol3(null) > .build(), > Employee.newBuilder() > .setId(3) > .setName("Bob Johnson") > .setSalary(65000.0f) > .setDepartment(DepartmentEnum.SALES) > .setAge(null) > .setEmail(null) > .setNewCol(null) > .setNewCol2(null) > .setNewCol3(null) > .build() > }; > > // Send each record > for (Employee emp : employees) { > System.out.println("Sending record for " + emp.getName() + " > with ID: " + emp.getId()); > ProducerRecord<String, Employee> record = new > ProducerRecord<>(topic, String.valueOf(emp.getId()), emp); > > // Use asynchronous sending with a callback instead of synchronous > get() > producer.send(record, new Callback() { > @Override > public void onCompletion(RecordMetadata metadata, Exception e) > { > if (e != null) { > System.err.println("Error sending record for " + > emp.getName() + ":"); > e.printStackTrace(); > } else { > System.out.println("Successfully sent " + > emp.getName() + " to partition " + > metadata.partition() + " at offset " + > metadata.offset()); > } > } > }); > > // Small delay to see logs clearly > Thread.sleep(100); > } > > // Ensure all records are sent before closing producer > producer.flush(); > System.out.println("All messages sent successfully"); > } > > ``` > > here is the error : > > ``` > > (base) Karans-MacBook-Pro:apicurio_confluent karanalang$ > (base) Karans-MacBook-Pro:apicurio_confluent karanalang$ mvn exec:java > -Dexec.mainClass="com.versa.apicurio.confluent.VersionBasedKafkaProducer" > -Dlog4j.debug > [INFO] Scanning for projects... > [INFO] > [INFO] ----------< com.versa.apicurio.confluent:apicurio_confluent > >----------- > [INFO] Building apicurio_confluent 1.0-SNAPSHOT > [INFO] --------------------------------[ jar > ]--------------------------------- > [INFO] > [INFO] --- exec-maven-plugin:3.5.0:java (default-cli) @ apicurio_confluent > --- > Creating custom truststore with Let's Encrypt certificates... > Downloading Let's Encrypt Root certificate... > Downloading Let's Encrypt R11 certificate... > Custom truststore created at: > > /Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks > Got access token: eyJhbGciOiJSUzI... > > === Verifying schema in registry === > Checking all subjects at: > https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects > Response Code: 200 > All subjects: ["com.versa.apicurio.confluent.Employee"] > Subject 'com.versa.apicurio.confluent.Employee' exists: true > Fetching versions at: > > https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions > Available versions: [1,2,3,4] > Highest version found: 4 > Fetching schema at: > > https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions/4 > Version 4 schema (raw): > > {"subject":"com.versa.apicurio.confluent.Employee","version":4,"id":2,"schema":"{\"type\": > \"record\", \"name\": \"Employee\", \"namespace\": > \"com.versa.apicurio.confluent\", \"fields\": [{\"name\": \"id\", > \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}, > {\"name\": \"salary\", \"type\": [\"null\", \"float\"], \"default\": > null}, {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": > null}, {\"name\": \"department\", \"type\": {\"type\": \"enum\", > \"name\": \"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\", > \"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"], > \"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\", > \"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\": > [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\", > \"type\": [\"null\", \"string\"], \"default\": > null}]}","schemaType":"AVRO","references":[]} > Schema ID: 2 > > Schema content (prettified): > {"type": "record", "name": "Employee", "namespace": > "com.versa.apicurio.confluent", "fields": [{"name": "id", "type": > "int"}, {"name": "name", "type": "string"}, {"name": "salary", "type": > ["null", "float"], "default": null}, {"name": "age", "type": ["null", > "int"], "default": null}, {"name": "department", "type": {"type": > "enum", "name": "DepartmentEnum", "symbols": ["HR", "ENGINEERING", > "SALES"]}}, {"name": "email", "type": ["null", "string"], "default": > null}, {"name": "new_col", "type": ["null", "string"], "default": > null}, {"name": "new_col2", "type": ["null", "string"], "default": > null}, {"name": "new_col3", "type": ["null", "string"], "default": > null}]} > > === Testing direct schema retrieval by ID === > Testing URL: > https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/schemas/ids/2 > Schema ID check response code: 200 > Schema by ID response: {"schema":"{\"type\": \"record\", \"name\": > \"Employee\", \"namespace\": \"com.versa.apicurio.confluent\", > \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": > \"name\", \"type\": \"string\"}, {\"name\": \"salary\", \"type\": > [\"null\", \"float\"], \"default\": null}, {\"name\": \"age\", > \"type\": [\"null\", \"int\"], \"default\": null}, {\"name\": > \"department\", \"type\": {\"type\": \"enum\", \"name\": > \"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\", > \"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"], > \"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\", > \"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\": > [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\", > \"type\": [\"null\", \"string\"], \"default\": > null}]}","schemaType":"AVRO","references":[]} > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > > [jar:file:/Users/karanalang/.m2/repository/org/slf4j/slf4j-reload4j/1.7.36/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > > [jar:file:/Users/karanalang/.m2/repository/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > log4j: Trying to find [log4j.xml] using context classloader > org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d. > log4j: Trying to find [log4j.xml] using > org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d > class loader. > log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource(). > log4j: Trying to find [log4j.properties] using context classloader > org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d. > log4j: Using URL > > [jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties] > for automatic log4j configuration. > log4j: Reading configuration from URL > > jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties > log4j: Parsing for [root] with value=[INFO, stdout]. > log4j: Level token is [INFO]. > log4j: Category root set to INFO > log4j: Parsing appender named "stdout". > log4j: Parsing layout options for "stdout". > log4j: Setting property [conversionPattern] to [[%d] %p %m (%c:%L)%n]. > log4j: End of parsing for "stdout". > log4j: Parsed "stdout" options. > log4j: Parsing for [org.apache.directory] with value=[ERROR, stdout]. > log4j: Level token is [ERROR]. > log4j: Category org.apache.directory set to ERROR > log4j: Parsing appender named "stdout". > log4j: Appender "stdout" was already parsed. > log4j: Handling log4j.additivity.org.apache.directory=[null] > log4j: Parsing for [org.apache.zookeeper] with value=[ERROR, stdout]. > log4j: Level token is [ERROR]. > log4j: Category org.apache.zookeeper set to ERROR > log4j: Parsing appender named "stdout". > log4j: Appender "stdout" was already parsed. > log4j: Handling log4j.additivity.org.apache.zookeeper=[null] > log4j: Parsing for [org.apache.kafka] with value=[ERROR, stdout]. > log4j: Level token is [ERROR]. > log4j: Category org.apache.kafka set to ERROR > log4j: Parsing appender named "stdout". > log4j: Appender "stdout" was already parsed. > log4j: Handling log4j.additivity.org.apache.kafka=[null] > log4j: Parsing for [kafka] with value=[ERROR, stdout]. > log4j: Level token is [ERROR]. > log4j: Category kafka set to ERROR > log4j: Parsing appender named "stdout". > log4j: Appender "stdout" was already parsed. > log4j: Handling log4j.additivity.kafka=[null] > log4j: Finished configuring. > SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] > [2025-05-04 15:37:20,915] INFO KafkaAvroSerializerConfig values: > auto.register.schemas = false > avro.reflection.allow.null = false > avro.remove.java.properties = false > avro.use.logical.type.converters = false > basic.auth.credentials.source = URL > basic.auth.user.info = [hidden] > bearer.auth.cache.expiry.buffer.seconds = 300 > bearer.auth.client.id = null > bearer.auth.client.secret = null > bearer.auth.credentials.source = USER_INFO > bearer.auth.custom.provider.class = null > bearer.auth.identity.pool.id = null > bearer.auth.issuer.endpoint.url = null > bearer.auth.logical.cluster = null > bearer.auth.scope = null > bearer.auth.scope.claim.name = scope > bearer.auth.sub.claim.name = sub > bearer.auth.token = [hidden] > context.name.strategy = class > io.confluent.kafka.serializers.context.NullContextNameStrategy > http.connect.timeout.ms = 60000 > http.read.timeout.ms = 60000 > id.compatibility.strict = true > key.subject.name.strategy = class > io.confluent.kafka.serializers.subject.TopicNameStrategy > latest.cache.size = 1000 > latest.cache.ttl.sec = -1 > latest.compatibility.strict = true > max.schemas.per.subject = 1000 > normalize.schemas = false > proxy.host = > proxy.port = -1 > rule.actions = [] > rule.executors = [] > rule.service.loader.enable = true > schema.format = null > schema.reflection = false > schema.registry.basic.auth.user.info = [hidden] > schema.registry.ssl.cipher.suites = null > schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.3] > schema.registry.ssl.endpoint.identification.algorithm = HTTPS > schema.registry.ssl.engine.factory.class = null > schema.registry.ssl.key.password = null > schema.registry.ssl.keymanager.algorithm = SunX509 > schema.registry.ssl.keystore.certificate.chain = null > schema.registry.ssl.keystore.key = null > schema.registry.ssl.keystore.location = null > schema.registry.ssl.keystore.password = null > schema.registry.ssl.keystore.type = JKS > schema.registry.ssl.protocol = TLSv1.3 > schema.registry.ssl.provider = null > schema.registry.ssl.secure.random.implementation = null > schema.registry.ssl.trustmanager.algorithm = PKIX > schema.registry.ssl.truststore.certificates = null > schema.registry.ssl.truststore.location = > > /Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks > schema.registry.ssl.truststore.password = [hidden] > schema.registry.ssl.truststore.type = JKS > schema.registry.url = > [https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7] > use.latest.version = true > use.latest.with.metadata = null > use.schema.id = 2 > value.subject.name.strategy = class > io.confluent.kafka.serializers.subject.RecordNameStrategy > (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:370) > Producer created successfully. Sending messages to: > syslog.ueba-nov.v1.nov.nov > Sending employee records... > Sending record for John Doe with ID: 1 > [WARNING] > org.apache.kafka.common.errors.SerializationException: Error > retrieving schema > > ID{"type":"record","name":"Employee","namespace":"com.versa.apicurio.confluent","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"salary","type":["null","float"],"default":null},{"name":"age","type":["null","int"],"default":null},{"name":"department","type":{"type":"enum","name":"DepartmentEnum","symbols":["HR","ENGINEERING","SALES"]}},{"name":"email","type":["null","string"],"default":null},{"name":"new_col","type":["null","string"],"default":null},{"name":"new_col2","type":["null","string"],"default":null},{"name":"new_col3","type":["null","string"],"default":null}]} > at > io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException > (AbstractKafkaSchemaSerDe.java:809) > at > io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl > (AbstractKafkaAvroSerializer.java:176) > at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize > (KafkaAvroSerializer.java:68) > at org.apache.kafka.clients.producer.KafkaProducer.doSend > (KafkaProducer.java:1000) > at org.apache.kafka.clients.producer.KafkaProducer.send > (KafkaProducer.java:947) > at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main > (VersionBasedKafkaProducer.java:507) > at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375) > at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364) > at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0 > (ExecJavaMojo.java:286) > at java.lang.Thread.run (Thread.java:833) > Caused by: > io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: > Error; error code: 50005 > at > io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest > (RestService.java:336) > at > io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest > (RestService.java:409) > at io.confluent.kafka.schemaregistry.client.rest.RestService.getId > (RestService.java:907) > at io.confluent.kafka.schemaregistry.client.rest.RestService.getId > (RestService.java:880) > at > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry > (CachedSchemaRegistryClient.java:333) > at > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId > (CachedSchemaRegistryClient.java:464) > at > io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getSchemaBySubjectAndId > (AbstractKafkaSchemaSerDe.java:534) > at > io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupSchemaBySubjectAndId > (AbstractKafkaSchemaSerDe.java:540) > at > io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl > (AbstractKafkaAvroSerializer.java:130) > at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize > (KafkaAvroSerializer.java:68) > at org.apache.kafka.clients.producer.KafkaProducer.doSend > (KafkaProducer.java:1000) > at org.apache.kafka.clients.producer.KafkaProducer.send > (KafkaProducer.java:947) > at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main > (VersionBasedKafkaProducer.java:507) > at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375) > at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364) > at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0 > (ExecJavaMojo.java:286) > at java.lang.Thread.run (Thread.java:833) > [INFO] > ------------------------------------------------------------------------ > [INFO] BUILD FAILURE > > ``` > > > In function - verifySchemaAndGetId(), i'm able to get the SchemaId & the > schema details, but when i use the Producer to publish the schema - it is > failing. > > Any idea on this ? > > tia! > > > here is the Stackoverflow link -> > > > > https://stackoverflow.com/questions/79606146/apicurio-schema-registry-3-0-7-producer-failing-in-confluent-compatibility-mod >