[ 
https://issues.apache.org/jira/browse/KAFKA-3816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645101#comment-16645101
 ] 

Randall Hauch commented on KAFKA-3816:
--------------------------------------

The above pull request makes very minimal changes to Connect to use four MDC 
parameters:

# {{connector.class}} - the shortened name of the connector class (e.g., 
"MySink" for a class named "MySinkConnector")
# {{connector.name}} - the name of the connector
# {{connector.scope}} - the scope within a task, with values like "{{worker}}" 
within the worker thread, "{{task}}" within the task worker (and task 
implementation), and "{{offsets}}" in the scheduled thread that commits offsets 
for source connectors.
# {{connector.task}} - the task number within a task worker thread, or unset 
elsewhere

These must be explicitly used within the {{connect-log4j.properties}} file, and 
are not enabled by default to maintain backward compatibility. If that file is 
modified to use:

{noformat}
log4j.appender.stdout.layout.ConversionPattern=[%d] %p 
%X{connector.name}|%X{connector.scope}%X{connector.task} %m (%c:%L)%n
{noformat}

the resulting log messages will include include the connector name, the task 
number, and the scope (this does not use the connector class name, which seems 
verbose). Here's an example of some log statements using the above Log4J line 
format, where the connector name is "local-file-source", the task number is 
"0", and the scope is either "task", "worker", or "offsets" (depending upon 
where the log messages are called). When these parameters are not set (in other 
contexts), the log messages show a blank value for them.

{noformat}
[2018-10-04 17:49:37,068] INFO | Kafka version : 2.1.0-SNAPSHOT 
(org.apache.kafka.common.utils.AppInfoParser:109)
[2018-10-04 17:49:37,068] INFO | Kafka commitId : 00a7bd8b636c184e 
(org.apache.kafka.common.utils.AppInfoParser:110)
[2018-10-04 17:49:37,222] INFO | Kafka cluster ID: bpHT_pLdRR-Y7HKEaEdHFA 
(org.apache.kafka.connect.util.ConnectUtils:59)
[2018-10-04 17:49:37,239] INFO | Logging initialized @2093ms to 
org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193)
[2018-10-04 17:49:37,282] INFO | Added connector for http://:8083 
(org.apache.kafka.connect.runtime.rest.RestServer:119)
[2018-10-04 17:49:37,302] INFO | Advertised URI: http://10.0.1.6:8083/ 
(org.apache.kafka.connect.runtime.rest.RestServer:267)
...
[2018-10-04 17:49:38,087] INFO local-file-source|connector Creating connector 
local-file-source of type FileStreamSource 
(org.apache.kafka.connect.runtime.Worker:238)
[2018-10-04 17:49:38,090] INFO local-file-source|connector Instantiated 
connector local-file-source with version 2.1.0-SNAPSHOT of type class 
org.apache.kafka.connect.file.FileStreamSourceConnector 
(org.apache.kafka.connect.runtime.Worker:241)
...
[2018-10-04 17:49:38,093] INFO local-file-source|task0 Creating task 
local-file-source-0 (org.apache.kafka.connect.runtime.Worker:404)
...
[2018-10-04 17:49:38,097] INFO local-file-source|task0 Instantiated task 
local-file-source-0 with version 2.1.0-SNAPSHOT of type 
org.apache.kafka.connect.file.FileStreamSourceTask 
(org.apache.kafka.connect.runtime.Worker:419)
[2018-10-04 17:49:38,097] INFO local-file-source|task0 JsonConverterConfig 
values: 
        converter.type = key
        schemas.cache.size = 1000
        schemas.enable = true
 (org.apache.kafka.connect.json.JsonConverterConfig:279)
[2018-10-04 17:49:38,098] INFO local-file-source|task0 Set up the key converter 
class org.apache.kafka.connect.json.JsonConverter for task local-file-source-0 
using the worker config (org.apache.kafka.connect.runtime.Worker:442)
[2018-10-04 17:49:38,098] INFO local-file-source|task0 JsonConverterConfig 
values: 
        converter.type = value
        schemas.cache.size = 1000
        schemas.enable = true
 (org.apache.kafka.connect.json.JsonConverterConfig:279)
[2018-10-04 17:49:38,098] INFO local-file-source|task0 Set up the value 
converter class org.apache.kafka.connect.json.JsonConverter for task 
local-file-source-0 using the worker config 
(org.apache.kafka.connect.runtime.Worker:448)
[2018-10-04 17:49:38,098] INFO local-file-source|task0 Set up the header 
converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task 
local-file-source-0 using the worker config 
(org.apache.kafka.connect.runtime.Worker:454)
[2018-10-04 17:49:38,109] INFO local-file-source|task0 ProducerConfig values: 
        acks = all
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 2147483647
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 0
        max.block.ms = 9223372036854775807
        max.in.flight.requests.per.connection = 1
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 2147483647
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig:279)
[2018-10-04 17:49:38,129] INFO local-file-source|task0 Kafka version : 
2.1.0-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-10-04 17:49:38,129] INFO local-file-source|task0 Kafka commitId : 
00a7bd8b636c184e (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-10-04 17:49:38,135] INFO local-file-source|task0 
WorkerSourceTask{id=local-file-source-0} Source task finished initialization 
and start (org.apache.kafka.connect.runtime.WorkerSourceTask:199)
[2018-10-04 17:49:38,135] INFO | Created connector local-file-source 
(org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-10-04 17:49:48,138] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-10-04 17:49:48,139] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for 
offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-10-04 17:49:58,144] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-10-04 17:49:58,145] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for 
offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-10-04 17:50:08,148] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-10-04 17:50:08,148] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for 
offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-10-04 17:50:18,152] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-10-04 17:50:18,152] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for 
offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-10-04 17:50:28,156] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-10-04 17:50:28,156] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for 
offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-10-04 17:50:38,159] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-10-04 17:50:38,159] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for 
offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-10-04 17:50:48,161] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-10-04 17:50:48,161] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for 
offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-10-04 17:50:58,166] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-10-04 17:50:58,166] INFO local-file-source|offsets0 
WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for 
offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-10-04 17:51:00,768] INFO | Kafka Connect stopping 
(org.apache.kafka.connect.runtime.Connect:65)
[2018-10-04 17:51:00,769] INFO | Stopping REST server 
(org.apache.kafka.connect.runtime.rest.RestServer:223)
[2018-10-04 17:51:00,774] INFO | Stopped 
http_8083@3d36dff4{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} 
(org.eclipse.jetty.server.AbstractConnector:341)
...
{noformat}

Also, these MDC parameters are inherited by any log messages output by the 
connector or task implementation within the Connect worker or task thread. Any 
threads created by the connector will not use these MDC parameters unless the 
connector's code is changed.

The goal is to make the logs much more easily grep-able. Filter on the 
connector name, for example, to see everything that the worker does for that 
connector (minus the connector management operations). Also filter on a task 
number to see what's going on in the thread that is running that task.


> Provide more context in Kafka Connect log messages using MDC
> ------------------------------------------------------------
>
>                 Key: KAFKA-3816
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3816
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 0.9.0.1
>            Reporter: Randall Hauch
>            Priority: Critical
>
> Currently it is relatively difficult to correlate individual log messages 
> with the various threads and activities that are going on within a Kafka 
> Connect worker, let along a cluster of workers. Log messages should provide 
> more context to make it easier and to allow log scraping tools to coalesce 
> related log messages.
> One simple way to do this is by using _mapped diagnostic contexts_, or MDC. 
> This is supported by the SLF4J API, and by the Logback and Log4J logging 
> frameworks.
> Basically, the framework would be changed so that each thread is configured 
> with one or more MDC parameters using the 
> {{org.slf4j.MDC.put(String,String)}} method in SLF4J. Once that thread is 
> configured, all log messages made using that thread have that context. The 
> logs can then be configured to use those parameters.
> It would be ideal to define a convention for connectors and the Kafka Connect 
> framework. A single set of MDC parameters means that the logging framework 
> can use the specific parameters on its message formats.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to