well, I was hoping someone in the group would have used apicurio schema registry and would be able to help out (esp since it is used with Kafka) Responses to issues created on apicurio github are very slow, if at all.
thanks, Karan Alang On Sun, May 4, 2025 at 8:04 PM Luke Chen <show...@gmail.com> wrote: > Hi Kara, > > This is obviously not the correct place for this question. > I saw you also asked in Apicurio-registry > <https://github.com/Apicurio/apicurio-registry/issues/6189> repo > <https://github.com/Apicurio/apicurio-registry/issues/6189>. > Please wait for their response there. > > Thanks. > Luke > > > On Mon, May 5, 2025 at 7:18 AM karan alang <karan.al...@gmail.com> wrote: > > > I have Apicurio Schema Registry 3.0.7 installed on GKE. I'm trying to use > > this in Confluent Compatibility mode .. primarily because on production, > my > > Kafka Producer is C++ client, and Apicurio serde is not supported for C++ > > client. > > > > Here is the Java code: > > > > ``` > > > > import com.versa.apicurio.confluent.Employee; > > import com.versa.apicurio.confluent.DepartmentEnum; > > import org.apache.kafka.clients.producer.*; > > import org.apache.kafka.common.serialization.StringSerializer; > > import io.confluent.kafka.serializers.KafkaAvroSerializer; > > > > > > public class VersionBasedKafkaProducer { > > > > private static final String REGISTRY_URL = > > "https://apicurio-sr.vkp.versa-vani.com"; > > private static final String LETS_ENCRYPT_ROOT_URL = > > "https://letsencrypt.org/certs/isrgrootx1.pem"; > > private static final String LETS_ENCRYPT_R11_URL = > > "https://letsencrypt.org/certs/2024/r11.pem"; > > private static final String TRUSTSTORE_PASSWORD = "changeit"; > > private static final String TRUSTSTORE_PATH = > > "apicurio-truststore.jks"; > > > > /** > > * Creates a custom truststore with Let's Encrypt certificates > > * @return The path to the created truststore > > */ > > private static String createCustomTruststore() throws Exception { > > <truncated> > > } > > > > /** > > * Downloads an X.509 certificate from a URL > > */ > > private static X509Certificate downloadCertificate(String certUrl) > > throws Exception { > > <truncated> > > } > > > > /** > > * Get the Access Token for authentication > > */ > > public static String getAccessToken(String tokenUrl, String > > clientId, String clientSecret) throws Exception { > > String params = "grant_type=client_credentials" > > + "&client_id=" + clientId > > + "&client_secret=" + clientSecret; > > URL url = new URL(tokenUrl); > > HttpURLConnection conn = (HttpURLConnection) > url.openConnection(); > > conn.setRequestMethod("POST"); > > conn.setDoOutput(true); > > conn.setRequestProperty("Content-Type", > > "application/x-www-form-urlencoded"); > > try (OutputStream os = conn.getOutputStream()) { > > os.write(params.getBytes(StandardCharsets.UTF_8)); > > } > > try (Scanner scanner = new Scanner(conn.getInputStream(), > > StandardCharsets.UTF_8.name())) { > > String resp = scanner.useDelimiter("\\A").next(); > > return resp.split("\"access_token\":\"")[1].split("\"")[0]; > > } > > } > > > > apiv > > > > /** > > * Verify schema exists and get information about it, returning the > schema > > ID > > */ > > private static int verifySchemaAndGetId(String token) throws Exception { > > System.out.println("\n=== Verifying schema in registry ==="); > > > > // Subject name with RecordNameStrategy > > String subject = "com.versa.apicurio.confluent.Employee"; > > > > // Check if the subject exists > > String subjectsUrl = String.format("%s/apis/ccompat/v7/subjects", > > REGISTRY_URL); > > System.out.println("Checking all subjects at: " + subjectsUrl); > > > > URL url = new URL(subjectsUrl); > > HttpURLConnection conn = (HttpURLConnection) url.openConnection(); > > conn.setRequestMethod("GET"); > > conn.setRequestProperty("Authorization", "Bearer " + token); > > > > int responseCode = conn.getResponseCode(); > > System.out.println("Response Code: " + responseCode); > > > > boolean subjectExists = false; > > > > if (responseCode == 200) { > > try (Scanner scanner = new Scanner(conn.getInputStream(), > > StandardCharsets.UTF_8.name())) { > > String resp = scanner.useDelimiter("\\A").next(); > > System.out.println("All subjects: " + resp); > > > > // Check if our subject is in the list > > subjectExists = resp.contains("\"" + subject + "\""); > > System.out.println("Subject '" + subject + "' exists: " + > > subjectExists); > > } > > } else { > > System.out.println("Failed to retrieve subjects list. Response > > code: " + responseCode); > > return -1; > > } > > > > if (!subjectExists) { > > System.out.println("Subject '" + subject + "' does not exist > > in the registry!"); > > return -1; > > } > > > > // Get versions > > String versionsUrl = > > String.format("%s/apis/ccompat/v7/subjects/%s/versions", REGISTRY_URL, > > subject); > > System.out.println("Fetching versions at: " + versionsUrl); > > > > URL versionsUrlObj = new URL(versionsUrl); > > HttpURLConnection versionsConn = (HttpURLConnection) > > versionsUrlObj.openConnection(); > > versionsConn.setRequestMethod("GET"); > > versionsConn.setRequestProperty("Authorization", "Bearer " + token); > > > > int versionsResponseCode = versionsConn.getResponseCode(); > > String highestVersion = "1"; > > > > if (versionsResponseCode == 200) { > > try (Scanner scanner = new > > Scanner(versionsConn.getInputStream(), StandardCharsets.UTF_8.name())) > > { > > String resp = scanner.useDelimiter("\\A").next(); > > System.out.println("Available versions: " + resp); > > > > if (resp.equals("[]") || resp.trim().isEmpty()) { > > System.out.println("Warning: No versions found for > > this subject!"); > > return -1; > > } else { > > // Find the highest version number > > Pattern pattern = Pattern.compile("\\d+"); > > Matcher matcher = pattern.matcher(resp); > > int maxVersion = 0; > > while (matcher.find()) { > > try { > > int version = Integer.parseInt(matcher.group()); > > if (version > maxVersion) { > > maxVersion = version; > > highestVersion = matcher.group(); > > } > > } catch (NumberFormatException e) { > > // Skip if not a valid number > > } > > } > > System.out.println("Highest version found: " + > > highestVersion); > > } > > } > > } else { > > System.out.println("Failed to list versions. Response code: " > > + versionsResponseCode); > > return -1; > > } > > > > // Get the specific version > > String versionUrl = > > String.format("%s/apis/ccompat/v7/subjects/%s/versions/%s", > > REGISTRY_URL, subject, highestVersion); > > System.out.println("Fetching schema at: " + versionUrl); > > > > URL specificVersionUrl = new URL(versionUrl); > > HttpURLConnection versionConn = (HttpURLConnection) > > specificVersionUrl.openConnection(); > > versionConn.setRequestMethod("GET"); > > versionConn.setRequestProperty("Authorization", "Bearer " + token); > > > > int versionResponseCode = versionConn.getResponseCode(); > > int schemaId = -1; > > > > if (versionResponseCode == 200) { > > try (Scanner scanner = new > > Scanner(versionConn.getInputStream(), StandardCharsets.UTF_8.name())) > > { > > String resp = scanner.useDelimiter("\\A").next(); > > System.out.println("Version " + highestVersion + " schema > > (raw): " + resp); > > > > // Extract schema ID > > if (resp.contains("\"id\":")) { > > String idStr = > > resp.split("\"id\":")[1].split(",")[0].trim(); > > try { > > schemaId = Integer.parseInt(idStr); > > System.out.println("Schema ID: " + schemaId); > > } catch (NumberFormatException e) { > > System.out.println("Failed to parse schema ID: " + > > idStr); > > return -1; > > } > > } > > > > // Extract and pretty print the schema > > try { > > String schemaJson = > > resp.split("\"schema\":\"")[1].split("\",\"schemaType\"")[0]; > > // Replace escaped quotes with regular quotes > > schemaJson = schemaJson.replace("\\\"", "\""); > > // Replace escaped backslashes with regular backslashes > > schemaJson = schemaJson.replace("\\\\", "\\"); > > System.out.println("\nSchema content (prettified):\n" > > + schemaJson); > > } catch (Exception e) { > > System.out.println("Failed to prettify schema: " + > > e.getMessage()); > > } > > } > > return schemaId; > > } else { > > System.out.println("Failed to get version. Response code: " + > > versionResponseCode); > > return -1; > > } > > } > > > > public static void main(String[] args) throws Exception { > > // Create truststore for SSL > > String truststorePath = createCustomTruststore(); > > > > // Get token > > String tokenUrl = > > " > > > https://keycloak.vkp.versa-vani.com/realms/readonly-realm/protocol/openid-connect/token > > "; > > String clientId = "apicurio-registry"; > > String clientSecret = "<secret>"; > > String token = getAccessToken(tokenUrl, clientId, clientSecret); > > System.out.println("Got access token: " + token.substring(0, 15) + > > "..."); > > > > // Get schema ID during verification > > int schemaId = verifySchemaAndGetId(token); > > // System.out.println("schemaId", schemaId); > > if (schemaId <= 0) { > > System.out.println("Failed to get valid schema ID. Exiting."); > > return; > > } > > > > // Add direct schema ID check > > String schemaIdUrl = > > String.format("%s/apis/ccompat/v7/schemas/ids/%d", REGISTRY_URL, > > schemaId); > > System.out.println("\n=== Testing direct schema retrieval by ID > ==="); > > System.out.println("Testing URL: " + schemaIdUrl); > > > > URL schemaIdUrlObj = new URL(schemaIdUrl); > > HttpURLConnection schemaIdConn = (HttpURLConnection) > > schemaIdUrlObj.openConnection(); > > schemaIdConn.setRequestMethod("GET"); > > schemaIdConn.setRequestProperty("Authorization", "Bearer " + token); > > > > int schemaIdResponseCode = schemaIdConn.getResponseCode(); > > System.out.println("Schema ID check response code: " + > > schemaIdResponseCode); > > > > if (schemaIdResponseCode == 200) { > > try (Scanner scanner = new > > Scanner(schemaIdConn.getInputStream(), StandardCharsets.UTF_8.name())) > > { > > String resp = scanner.useDelimiter("\\A").next(); > > System.out.println("Schema by ID response: " + resp); > > } > > } else { > > System.out.println("Failed to get schema by ID. Response code: > > " + schemaIdResponseCode); > > } > > > > // Set up Kafka producer > > Properties props = new Properties(); > > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<IP>:<port>"); > > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > > StringSerializer.class.getName()); > > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > KafkaAvroSerializer.class.getName()); > > > > // Schema Registry config - Updated with dynamic schema ID > > props.put("schema.registry.url", REGISTRY_URL + "/apis/ccompat/v7"); > > props.put("bearer.auth.credentials.source", "USER_INFO"); > > props.put("bearer.auth.token", token); > > props.put("value.subject.name.strategy", > > "io.confluent.kafka.serializers.subject.RecordNameStrategy"); > > props.put("auto.register.schemas", "false"); > > props.put("apicurio.registry.as-confluent", "true"); > > props.put("apicurio.registry.use-id", "contentId"); > > > > // props.put("value.subject.name.strategy", > > "io.confluent.kafka.serializers.subject.TopicNameStrategy"); > > // props.put("auto.register.schemas", "true"); // > > > > // Use the dynamically retrieved schema ID > > props.put("use.schema.id", String.valueOf(schemaId)); > > > > // Additional helpful configuration > > props.put("specific.avro.reader", "true"); > > props.put("use.latest.version", "true"); > > > > // SSL settings for Kafka > > props.put("ssl.truststore.location", > > > > > "/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc-tf/versa-kafka-poc-tf-cluster-ca-cert/versa-kafka-poc-tf-ca.p12"); > > props.put("ssl.truststore.password", "pwd"); > > props.put("ssl.truststore.type", "PKCS12"); > > props.put("ssl.keystore.location", > > > > > "/Users/karanalang/Documents/Technology/0.ACME/strimzi-certs/versa-kafka-poc/syslog-vani-prefix/syslog-vani-prefix.p12"); > > props.put("ssl.keystore.password", "pwd"); > > props.put("ssl.keystore.type", "PKCS12"); > > props.put("security.protocol", "SSL"); > > > > // Schema Registry SSL settings > > props.put("schema.registry.ssl.truststore.location", truststorePath); > > props.put("schema.registry.ssl.truststore.password", > > TRUSTSTORE_PASSWORD); > > props.put("schema.registry.ssl.truststore.type", "JKS"); > > props.put("schema.registry.ssl.endpoint.identification.algorithm", > > "HTTPS"); > > > > props.put("schema.reflection", "false"); > > props.put("specific.avro.reader", "true"); > > props.put("schema.registry.group.id", "default"); > > > > // Add debugging parameter > > props.put("debug", "all"); > > > > // For Apicurio 3.0.7, schema may be under a group > > props.put("schema.registry.group.id", "default"); > > > > // Enhanced debugging > > > > > System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.serializers", > > "TRACE"); > > > > > System.setProperty("org.slf4j.simpleLogger.log.io.confluent.kafka.schemaregistry", > > "TRACE"); > > > > // Using your original topic > > String topic = "<topic>"; > > > > // Update to your main method where you create and send messages > > try (Producer<String, Employee> producer = new KafkaProducer<>(props)) { > > System.out.println("Producer created successfully. Sending > > messages to: " + topic); > > > > System.out.println("Sending employee records..."); > > > > // Create employees with null values for new_col2 and new_col3 > > Employee[] employees = { > > Employee.newBuilder() > > .setId(1) > > .setName("John Doe") > > .setAge(30) > > .setSalary(75000.0f) > > .setDepartment(DepartmentEnum.ENGINEERING) > > .setEmail(null) > > .setNewCol(null) > > .setNewCol2(null) > > .setNewCol3(null) > > .build(), > > Employee.newBuilder() > > .setId(2) > > .setName("Jane Smith") > > .setAge(28) > > .setSalary(85000.0f) > > .setDepartment(DepartmentEnum.HR) > > .setEmail(null) > > .setNewCol(null) > > .setNewCol2(null) > > .setNewCol3(null) > > .build(), > > Employee.newBuilder() > > .setId(3) > > .setName("Bob Johnson") > > .setSalary(65000.0f) > > .setDepartment(DepartmentEnum.SALES) > > .setAge(null) > > .setEmail(null) > > .setNewCol(null) > > .setNewCol2(null) > > .setNewCol3(null) > > .build() > > }; > > > > // Send each record > > for (Employee emp : employees) { > > System.out.println("Sending record for " + emp.getName() + " > > with ID: " + emp.getId()); > > ProducerRecord<String, Employee> record = new > > ProducerRecord<>(topic, String.valueOf(emp.getId()), emp); > > > > // Use asynchronous sending with a callback instead of > synchronous > > get() > > producer.send(record, new Callback() { > > @Override > > public void onCompletion(RecordMetadata metadata, Exception > e) > > { > > if (e != null) { > > System.err.println("Error sending record for " + > > emp.getName() + ":"); > > e.printStackTrace(); > > } else { > > System.out.println("Successfully sent " + > > emp.getName() + " to partition " + > > metadata.partition() + " at offset " + > > metadata.offset()); > > } > > } > > }); > > > > // Small delay to see logs clearly > > Thread.sleep(100); > > } > > > > // Ensure all records are sent before closing producer > > producer.flush(); > > System.out.println("All messages sent successfully"); > > } > > > > ``` > > > > here is the error : > > > > ``` > > > > (base) Karans-MacBook-Pro:apicurio_confluent karanalang$ > > (base) Karans-MacBook-Pro:apicurio_confluent karanalang$ mvn exec:java > > -Dexec.mainClass="com.versa.apicurio.confluent.VersionBasedKafkaProducer" > > -Dlog4j.debug > > [INFO] Scanning for projects... > > [INFO] > > [INFO] ----------< com.versa.apicurio.confluent:apicurio_confluent > > >----------- > > [INFO] Building apicurio_confluent 1.0-SNAPSHOT > > [INFO] --------------------------------[ jar > > ]--------------------------------- > > [INFO] > > [INFO] --- exec-maven-plugin:3.5.0:java (default-cli) @ > apicurio_confluent > > --- > > Creating custom truststore with Let's Encrypt certificates... > > Downloading Let's Encrypt Root certificate... > > Downloading Let's Encrypt R11 certificate... > > Custom truststore created at: > > > > > /Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks > > Got access token: eyJhbGciOiJSUzI... > > > > === Verifying schema in registry === > > Checking all subjects at: > > https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects > > Response Code: 200 > > All subjects: ["com.versa.apicurio.confluent.Employee"] > > Subject 'com.versa.apicurio.confluent.Employee' exists: true > > Fetching versions at: > > > > > https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions > > Available versions: [1,2,3,4] > > Highest version found: 4 > > Fetching schema at: > > > > > https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/subjects/com.versa.apicurio.confluent.Employee/versions/4 > > Version 4 schema (raw): > > > > > {"subject":"com.versa.apicurio.confluent.Employee","version":4,"id":2,"schema":"{\"type\": > > \"record\", \"name\": \"Employee\", \"namespace\": > > \"com.versa.apicurio.confluent\", \"fields\": [{\"name\": \"id\", > > \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}, > > {\"name\": \"salary\", \"type\": [\"null\", \"float\"], \"default\": > > null}, {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": > > null}, {\"name\": \"department\", \"type\": {\"type\": \"enum\", > > \"name\": \"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\", > > \"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"], > > \"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\", > > \"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\": > > [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\", > > \"type\": [\"null\", \"string\"], \"default\": > > null}]}","schemaType":"AVRO","references":[]} > > Schema ID: 2 > > > > Schema content (prettified): > > {"type": "record", "name": "Employee", "namespace": > > "com.versa.apicurio.confluent", "fields": [{"name": "id", "type": > > "int"}, {"name": "name", "type": "string"}, {"name": "salary", "type": > > ["null", "float"], "default": null}, {"name": "age", "type": ["null", > > "int"], "default": null}, {"name": "department", "type": {"type": > > "enum", "name": "DepartmentEnum", "symbols": ["HR", "ENGINEERING", > > "SALES"]}}, {"name": "email", "type": ["null", "string"], "default": > > null}, {"name": "new_col", "type": ["null", "string"], "default": > > null}, {"name": "new_col2", "type": ["null", "string"], "default": > > null}, {"name": "new_col3", "type": ["null", "string"], "default": > > null}]} > > > > === Testing direct schema retrieval by ID === > > Testing URL: > > https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7/schemas/ids/2 > > Schema ID check response code: 200 > > Schema by ID response: {"schema":"{\"type\": \"record\", \"name\": > > \"Employee\", \"namespace\": \"com.versa.apicurio.confluent\", > > \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": > > \"name\", \"type\": \"string\"}, {\"name\": \"salary\", \"type\": > > [\"null\", \"float\"], \"default\": null}, {\"name\": \"age\", > > \"type\": [\"null\", \"int\"], \"default\": null}, {\"name\": > > \"department\", \"type\": {\"type\": \"enum\", \"name\": > > \"DepartmentEnum\", \"symbols\": [\"HR\", \"ENGINEERING\", > > \"SALES\"]}}, {\"name\": \"email\", \"type\": [\"null\", \"string\"], > > \"default\": null}, {\"name\": \"new_col\", \"type\": [\"null\", > > \"string\"], \"default\": null}, {\"name\": \"new_col2\", \"type\": > > [\"null\", \"string\"], \"default\": null}, {\"name\": \"new_col3\", > > \"type\": [\"null\", \"string\"], \"default\": > > null}]}","schemaType":"AVRO","references":[]} > > SLF4J: Class path contains multiple SLF4J bindings. > > SLF4J: Found binding in > > > > > [jar:file:/Users/karanalang/.m2/repository/org/slf4j/slf4j-reload4j/1.7.36/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: Found binding in > > > > > [jar:file:/Users/karanalang/.m2/repository/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > > explanation. > > log4j: Trying to find [log4j.xml] using context classloader > > > org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d. > > log4j: Trying to find [log4j.xml] using > > org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d > > class loader. > > log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource(). > > log4j: Trying to find [log4j.properties] using context classloader > > > org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader@34fe326d. > > log4j: Using URL > > > > > [jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties] > > for automatic log4j configuration. > > log4j: Reading configuration from URL > > > > > jar:file:/Users/karanalang/.m2/repository/io/confluent/kafka-schema-registry/7.6.0/kafka-schema-registry-7.6.0.jar!/log4j.properties > > log4j: Parsing for [root] with value=[INFO, stdout]. > > log4j: Level token is [INFO]. > > log4j: Category root set to INFO > > log4j: Parsing appender named "stdout". > > log4j: Parsing layout options for "stdout". > > log4j: Setting property [conversionPattern] to [[%d] %p %m (%c:%L)%n]. > > log4j: End of parsing for "stdout". > > log4j: Parsed "stdout" options. > > log4j: Parsing for [org.apache.directory] with value=[ERROR, stdout]. > > log4j: Level token is [ERROR]. > > log4j: Category org.apache.directory set to ERROR > > log4j: Parsing appender named "stdout". > > log4j: Appender "stdout" was already parsed. > > log4j: Handling log4j.additivity.org.apache.directory=[null] > > log4j: Parsing for [org.apache.zookeeper] with value=[ERROR, stdout]. > > log4j: Level token is [ERROR]. > > log4j: Category org.apache.zookeeper set to ERROR > > log4j: Parsing appender named "stdout". > > log4j: Appender "stdout" was already parsed. > > log4j: Handling log4j.additivity.org.apache.zookeeper=[null] > > log4j: Parsing for [org.apache.kafka] with value=[ERROR, stdout]. > > log4j: Level token is [ERROR]. > > log4j: Category org.apache.kafka set to ERROR > > log4j: Parsing appender named "stdout". > > log4j: Appender "stdout" was already parsed. > > log4j: Handling log4j.additivity.org.apache.kafka=[null] > > log4j: Parsing for [kafka] with value=[ERROR, stdout]. > > log4j: Level token is [ERROR]. > > log4j: Category kafka set to ERROR > > log4j: Parsing appender named "stdout". > > log4j: Appender "stdout" was already parsed. > > log4j: Handling log4j.additivity.kafka=[null] > > log4j: Finished configuring. > > SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] > > [2025-05-04 15:37:20,915] INFO KafkaAvroSerializerConfig values: > > auto.register.schemas = false > > avro.reflection.allow.null = false > > avro.remove.java.properties = false > > avro.use.logical.type.converters = false > > basic.auth.credentials.source = URL > > basic.auth.user.info = [hidden] > > bearer.auth.cache.expiry.buffer.seconds = 300 > > bearer.auth.client.id = null > > bearer.auth.client.secret = null > > bearer.auth.credentials.source = USER_INFO > > bearer.auth.custom.provider.class = null > > bearer.auth.identity.pool.id = null > > bearer.auth.issuer.endpoint.url = null > > bearer.auth.logical.cluster = null > > bearer.auth.scope = null > > bearer.auth.scope.claim.name = scope > > bearer.auth.sub.claim.name = sub > > bearer.auth.token = [hidden] > > context.name.strategy = class > > io.confluent.kafka.serializers.context.NullContextNameStrategy > > http.connect.timeout.ms = 60000 > > http.read.timeout.ms = 60000 > > id.compatibility.strict = true > > key.subject.name.strategy = class > > io.confluent.kafka.serializers.subject.TopicNameStrategy > > latest.cache.size = 1000 > > latest.cache.ttl.sec = -1 > > latest.compatibility.strict = true > > max.schemas.per.subject = 1000 > > normalize.schemas = false > > proxy.host = > > proxy.port = -1 > > rule.actions = [] > > rule.executors = [] > > rule.service.loader.enable = true > > schema.format = null > > schema.reflection = false > > schema.registry.basic.auth.user.info = [hidden] > > schema.registry.ssl.cipher.suites = null > > schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.3] > > schema.registry.ssl.endpoint.identification.algorithm = HTTPS > > schema.registry.ssl.engine.factory.class = null > > schema.registry.ssl.key.password = null > > schema.registry.ssl.keymanager.algorithm = SunX509 > > schema.registry.ssl.keystore.certificate.chain = null > > schema.registry.ssl.keystore.key = null > > schema.registry.ssl.keystore.location = null > > schema.registry.ssl.keystore.password = null > > schema.registry.ssl.keystore.type = JKS > > schema.registry.ssl.protocol = TLSv1.3 > > schema.registry.ssl.provider = null > > schema.registry.ssl.secure.random.implementation = null > > schema.registry.ssl.trustmanager.algorithm = PKIX > > schema.registry.ssl.truststore.certificates = null > > schema.registry.ssl.truststore.location = > > > > > /Users/karanalang/Documents/Technology/apicurio-schema-registry/SchemaRegistry/apicurio_confluent/apicurio-truststore.jks > > schema.registry.ssl.truststore.password = [hidden] > > schema.registry.ssl.truststore.type = JKS > > schema.registry.url = > > [https://apicurio-sr.vkp.versa-vani.com/apis/ccompat/v7] > > use.latest.version = true > > use.latest.with.metadata = null > > use.schema.id = 2 > > value.subject.name.strategy = class > > io.confluent.kafka.serializers.subject.RecordNameStrategy > > (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:370) > > Producer created successfully. Sending messages to: > > syslog.ueba-nov.v1.nov.nov > > Sending employee records... > > Sending record for John Doe with ID: 1 > > [WARNING] > > org.apache.kafka.common.errors.SerializationException: Error > > retrieving schema > > > > > ID{"type":"record","name":"Employee","namespace":"com.versa.apicurio.confluent","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"salary","type":["null","float"],"default":null},{"name":"age","type":["null","int"],"default":null},{"name":"department","type":{"type":"enum","name":"DepartmentEnum","symbols":["HR","ENGINEERING","SALES"]}},{"name":"email","type":["null","string"],"default":null},{"name":"new_col","type":["null","string"],"default":null},{"name":"new_col2","type":["null","string"],"default":null},{"name":"new_col3","type":["null","string"],"default":null}]} > > at > > io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException > > (AbstractKafkaSchemaSerDe.java:809) > > at > > io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl > > (AbstractKafkaAvroSerializer.java:176) > > at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize > > (KafkaAvroSerializer.java:68) > > at org.apache.kafka.clients.producer.KafkaProducer.doSend > > (KafkaProducer.java:1000) > > at org.apache.kafka.clients.producer.KafkaProducer.send > > (KafkaProducer.java:947) > > at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main > > (VersionBasedKafkaProducer.java:507) > > at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375) > > at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364) > > at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0 > > (ExecJavaMojo.java:286) > > at java.lang.Thread.run (Thread.java:833) > > Caused by: > > > io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: > > Error; error code: 50005 > > at > > io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest > > (RestService.java:336) > > at > > io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest > > (RestService.java:409) > > at io.confluent.kafka.schemaregistry.client.rest.RestService.getId > > (RestService.java:907) > > at io.confluent.kafka.schemaregistry.client.rest.RestService.getId > > (RestService.java:880) > > at > > > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry > > (CachedSchemaRegistryClient.java:333) > > at > > > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId > > (CachedSchemaRegistryClient.java:464) > > at > > > io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getSchemaBySubjectAndId > > (AbstractKafkaSchemaSerDe.java:534) > > at > > > io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupSchemaBySubjectAndId > > (AbstractKafkaSchemaSerDe.java:540) > > at > > io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl > > (AbstractKafkaAvroSerializer.java:130) > > at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize > > (KafkaAvroSerializer.java:68) > > at org.apache.kafka.clients.producer.KafkaProducer.doSend > > (KafkaProducer.java:1000) > > at org.apache.kafka.clients.producer.KafkaProducer.send > > (KafkaProducer.java:947) > > at com.versa.apicurio.confluent.VersionBasedKafkaProducer.main > > (VersionBasedKafkaProducer.java:507) > > at org.codehaus.mojo.exec.ExecJavaMojo.doMain (ExecJavaMojo.java:375) > > at org.codehaus.mojo.exec.ExecJavaMojo.doExec (ExecJavaMojo.java:364) > > at org.codehaus.mojo.exec.ExecJavaMojo.lambda$execute$0 > > (ExecJavaMojo.java:286) > > at java.lang.Thread.run (Thread.java:833) > > [INFO] > > ------------------------------------------------------------------------ > > [INFO] BUILD FAILURE > > > > ``` > > > > > > In function - verifySchemaAndGetId(), i'm able to get the SchemaId & the > > schema details, but when i use the Producer to publish the schema - it is > > failing. > > > > Any idea on this ? > > > > tia! > > > > > > here is the Stackoverflow link -> > > > > > > > > > https://stackoverflow.com/questions/79606146/apicurio-schema-registry-3-0-7-producer-failing-in-confluent-compatibility-mod > > >