[jira] [Created] (KAFKA-7543) Kafka Connect JDBC Sink failing to establish connection to Topic, while the connection is working fine with standalone consumer
Kashyap Ivaturi created KAFKA-7543: -- Summary: Kafka Connect JDBC Sink failing to establish connection to Topic, while the connection is working fine with standalone consumer Key: KAFKA-7543 URL: https://issues.apache.org/jira/browse/KAFKA-7543 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Kashyap Ivaturi Hi, I'am trying to build Kafka Connect JDBC Sink Connector to have my DB updated with the data I get in Kafka Topic. I had implemented JDBC Source Connectors before which worked very well but in this case when I try to run the Sink Connector its internally failing to connect to the Topic and disconnecting from the Kafka broker and this is happening in loop. When I have enabled TRACE I got below details in the log. Any idea why the consumer is unable to connect to the Topic?. Actually when I have used a standalone consumer from my another application it worked pretty well in connecting to the Topic and reading messages from it. Please let me know if you have any suggestions. [2018-10-24 23:03:24,134] INFO WorkerSinkTask\{id=hrmsAckEvents-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:268) [2018-10-24 23:03:24,135] TRACE WorkerSinkTask\{id=hrmsAckEvents-0} Polling consumer with timeout 4875 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:282) [2018-10-24 23:03:24,136] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,136] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Sending GroupCoordinator request to broker messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:183) [2018-10-24 23:03:24,281] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Initiating connection to node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:183) [2018-10-24 23:03:24,293] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,295] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,346] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,365] DEBUG Added sensor with name node--1.bytes-sent (org.apache.kafka.common.metrics.Metrics:404) [2018-10-24 23:03:24,367] DEBUG Added sensor with name node--1.bytes-received (org.apache.kafka.common.metrics.Metrics:404) [2018-10-24 23:03:24,374] DEBUG Added sensor with name node--1.latency (org.apache.kafka.common.metrics.Metrics:404) [2018-10-24 23:03:24,376] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 (org.apache.kafka.common.network.Selector:195) [2018-10-24 23:03:24,377] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Completed connection to node -1. Fetching API versions. (org.apache.kafka.clients.NetworkClient:183) [2018-10-24 23:03:24,377] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Initiating API versions fetch from node -1. (org.apache.kafka.clients.NetworkClient:183) [2018-10-24 23:03:24,378] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] No version information found when sending API_VERSIONS with correlation id 1 to node -1. Assuming version 1. (org.apache.kafka.clients.NetworkClient:135) [2018-10-24 23:03:24,380] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Sending API_VERSIONS {} with correlation id 1 to node -1 (org.apache.kafka.clients.NetworkClient:135) [2018-10-24 23:03:24,385] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,389] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,724] TRACE [Consumer clientId=consumer-1, groupId=hrmsack] Found least loaded node messaging-rtp3.cisco.com:9093 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:123) [2018-10-24 23:03:24,725] DEBUG [Consumer clientId=consumer-1, groupId=hrmsack] Connection with messaging-rtp3.cisco.com/64.101.96.6 disconnected (org.apache.kafka.common.network.Selector:189) java.io.EOFException at
[jira] [Created] (KAFKA-7529) Kafka Connect JDBC doesn't push new records to Kafka Topic unless the connector is restarted
Kashyap Ivaturi created KAFKA-7529: -- Summary: Kafka Connect JDBC doesn't push new records to Kafka Topic unless the connector is restarted Key: KAFKA-7529 URL: https://issues.apache.org/jira/browse/KAFKA-7529 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Kashyap Ivaturi Hi, We have a Kafka Connect JDBC Source Connector which keeps polling for new records in a Oracle table every minute and push the new records to Kafka Topic. New records are determined by an incrementing column. In general everything works well but once in a while we see that even though there were new records with incrementing column those records doesn't get pushed to the Topic. There is no clue of any error in the logs and the connector is in running state. Only after we restart the Connector the new records are pushed to the Topic. Any idea in what situation can this happen?. Rgds Kashyap. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working
Kashyap Ivaturi created KAFKA-7365: -- Summary: max.poll.records setting in Kafka Consumer is not working Key: KAFKA-7365 URL: https://issues.apache.org/jira/browse/KAFKA-7365 Project: Kafka Issue Type: Bug Components: consumer Reporter: Kashyap Ivaturi Hi, I have a requirement where I consume messages one by one, each message has additional processing that I should do and then manually commit the offset. Things work well most of the times until I get a big bunch of records which takes longer time to process and I encounter CommitFailed exception for the last set of records even though they were processed. While i'am able to reconnect back its picking some messages that I had already processed. I don't want this to happen as its creating duplicates in target systems that I integrate with while processing the message. I decided that even though there are more messages in the queue , I would like to have a control on how many records I can process when polled. I tried to replicate a scenario where I have started the consumer by setting 'max.poll.records' to '1' and then pushed 4 messages into the Topic the consumer is listening. I expected that the consumer will only process 1 message because of my 'max.poll.records' setting but the consumer has processed all the 4 messages in single poll. Any idea why did it not consider 'max.poll.records' setting or is some other setting overriding this setting?. Appreciate your help or guidance in troubleshooting this issue. Here is the log of my Consumer config when it starts: 2018-08-28 08:29:47.873 INFO 91121 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 auto.offset.reset = earliest bootstrap.servers = [messaging-rtp3.cisco.com:9093] check.crcs = true [client.id|https://client.id/] = [connections.max.idle.ms|https://connections.max.idle.ms/] = 54 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 fetch.min.bytes = 1 [group.id|https://group.id/] = empestor [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 [max.poll.interval.ms|https://max.poll.interval.ms/] = 30 max.poll.records = 1 [metadata.max.age.ms|https://metadata.max.age.ms/] = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 3 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000 [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50 [request.timeout.ms|https://request.timeout.ms/] = 4 [retry.backoff.ms|https://retry.backoff.ms/] = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 [session.timeout.ms|https://session.timeout.ms/] = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = [hidden] ssl.keymanager.algorithm = SunX509 ssl.keystore.location = /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = /kafka/certs/empestor/certificates/kafka.client.truststore.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2018-08-28 08:29:48.079 INFO 91121 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)