Hi,All:
I use Kafka Source to read events from one Kafka topic and write events to
another Topic with Kafka Sink,
the Kafka Sink topic configuration is not work, flume still write events to
Kafka source Topic (sourceTopic).
agent_myAgent.sources.kafkaSource.topic = sourceTopic
agent_myAgent.sinks.kafkaSink.topic = sinkTopic
SourceCode in "org.apache.flume.source.kafka.KafkaSource.process()" :
// Add headers to event (topic, timestamp, and key)
headers = new HashMap<String, String>();
headers.put(KafkaSourceConstants.TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
headers.put(KafkaSourceConstants.TOPIC, topic);
and in Kafka Sink properties:
The topic in Kafka to which the messages will be published. If this parameter
is configured, messages will be published to this topic. If the event header
contains a ??topic?? field, the event will be published to that topic
overriding the topic configured here.
SourceCode in "org.apache.flume.sink.kafka.KafkaSink.process()":
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
In my case, how I can fix this problem?
Flume version is 1.6.0.
Thanks!