Hi I’m trying to send logs to Kafka via log4j2. The Topic I’m sending to already has a Schema and I cannot find a way for the KafkaAppender to get the Schema Id as it always passes the string “bytes” to the Schema Registry.
In (1) KafkaAppender.tryAppend a byte[] is created representing the LogEvent to be sent to Kafka. It is always a byte[], never a subclass of GenericContainer. Further down the call chain the Schema string is obtained by calling (2) AvroSchemaUtils.getSchema(). This returns “bytes” as the object passed in is the byte[] from tryAppend. This is then used in a call to the Schema Registry which expects a string representation of the Schema and not the string "bytes". I haven’t found a way to set the Schema Id for the KafkaAppender or have the appender use a GenericContainer subclass of my choosing. Is either possible or is there another way? Thanks Michael 1) Creating the byte[] to be sent to Kafka: private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException { final Layout<? extends Serializable> layout = getLayout(); byte[] data; if (layout instanceof SerializedLayout) { final byte[] header = layout.getHeader(); final byte[] body = layout.toByteArray(event); data = new byte[header.length + body.length]; System.arraycopy(header, 0, data, 0, header.length); System.arraycopy(body, 0, data, header.length, body.length); } else { data = layout.toByteArray(event); } manager.send(data); } 2) Getting the Schema string to pass to the Schema Registry to get the Schema Id: public static Schema getSchema(Object object) { if (object == null) { return (Schema)primitiveSchemas.get("Null"); } else if (object instanceof Boolean) { return (Schema)primitiveSchemas.get("Boolean"); } else if (object instanceof Integer) { return (Schema)primitiveSchemas.get("Integer"); } else if (object instanceof Long) { return (Schema)primitiveSchemas.get("Long"); } else if (object instanceof Float) { return (Schema)primitiveSchemas.get("Float"); } else if (object instanceof Double) { return (Schema)primitiveSchemas.get("Double"); } else if (object instanceof CharSequence) { return (Schema)primitiveSchemas.get("String"); } else if (object instanceof byte[]) { return (Schema)primitiveSchemas.get("Bytes"); } else if (object instanceof GenericContainer) { return ((GenericContainer)object).getSchema(); } else { throw new IllegalArgumentException("Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord"); } }