Hello,

I'm experiencing a problem using Kafka Connect's JdbcSinkConnector. I'm
creating two connectors using the following script: `./create-connector.sh
test` and `./create-connector.sh test2`.

The first one `test` works, the second one `test2` doesn't work. Meaning,
the first one successfully copies data into postgres, the other one fails
with the error message below. The only difference between `test` and
`test2` is that the second is the result of: `test.map(...).through(.., ..,
"test2").`

*Error*
{
  "name": "test2",
  "connector": {
    "state": "RUNNING",
    "worker_id": "localhost:8083"
  },
  "tasks": [
    {
      "state": "FAILED",
      "trace": "org.apache.kafka.connect.errors.DataException: Failed to
deserialize data to Avro: \n\tat
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat
java.lang.Thread.run(Thread.java:745)\nCaused by:
org.apache.kafka.common.errors.SerializationException: Error deserializing
Avro message for id -1\nCaused by:
org.apache.kafka.common.errors.SerializationException: Unknown magic
byte!\n",
      "id": 0,
      "worker_id": "localhost:8083"
    }
  ]
}

*create-connector.sh*
#! /bin/bash
NAME=$1
TOPICS=$1

# create the JDBC sink connector.
curl -X POST \
  -H "Content-Type: application/json" \
  --data '{"name": "'$NAME'", "config": { "connector.class":
"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": 1,
"connection.url":
"jdbc:postgresql://localhost:7999/kafka?user=postgres&password=somepassword33",
"topics": "'$TOPICS'", "poll.interval.ms": 1000, "auto.create": true,
"auto.evolve": true } }' \
  http://localhost:8083/connector

*connect server*
docker run -d \
  --name=kafka-connect \
  --net=host \
  -e CONNECT_BOOTSTRAP_SERVERS=localhost:9092 \
  -e CONNECT_REST_PORT=8083 \
  -e CONNECT_GROUP_ID="logistics" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="logistics-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="logistics-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="logistics-status" \
  -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081"; \
  -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081"; \
  -e
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
\
  -e
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
\
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
  confluentinc/cp-kafka-connect:latest

*Producer*
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081";);

String topic = "test";

Producer<String, test> producer = new KafkaProducer<>(props);

while (true) {
test command = CommandGenerator.getNextTest();
System.out.println("Generated event " + command.toString());

ProducerRecord<String, test> record = new ProducerRecord<>(topic,
UUID.randomUUID().toString(), command);
producer.send(record);
Thread.sleep(500);
}
}

*Consumer*
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"create-order");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
"localhost:2181");
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081";);

final KStream<String, test> tests = builder.stream(Serdes.String(),
testSpecificAvroSerde, "test");

tests.map((id, command) -> {
System.out.println("test id=" + id + " command=" + command);
command.setId(9);

return new KeyValue<>(UUID.randomUUID().toString(), command);
})
.through(Serdes.String(), testSpecificAvroSerde, "test2");


*test.avsc*
{
"type": "record",
        "namespace": "com.foodpanda.command.avro",
"name": "test",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "product",
"type": "string"
}, {
"name": "quantity",
"type": "int"
}, {
"name": "price",
"type": "float"
}]
}

-- 

Nick DeCoursin
Software Engineer
foodpanda

Tel | +1 920 450 5434

Mail | n.decour...@foodpanda.com

Skype | nick.foodpanda

Foodpanda GmbH | Schreiberhauer Str. 30 | 10317 Berlin | Germany
Sitz der Gesellschaft | Berlin, AG Charlottenburg | HRB 138224 B |
USt-ID-Nr | DE 283789080
Geschäftsführer | Benjamin Bauer, Felix Plog, Ralf Wenzel

CONFIDENTIALITY NOTICE: This message (including any attachments) is
confidential and may be privileged. It may be read, copied and used only by
the intended recipient. If you have received it in error please contact the
sender (by return e-mail) immediately and delete this message. Any
unauthorized use or dissemination of this message in whole or in parts is
strictly prohibited.

Reply via email to