Jdbc Source Connector Holding more active sessions
Hi everyone we are using Kafka JDBC connector to connect to Redshift database we created three connectors to pull the data from three different table to three different topic we observed that each connector holding multiple active sessions even if we delete the connectors, active sessions are still available and not closing Did anyone faced same kind of issues? is there any configuration is available to restrict this kind of behaviuor? can anyone know how to overcome this issues?
How to get Date format value before 1970 in Kafka connect
Hi guys I am using JDBC SOURCE CONNECTOR to take data from AWS Redshift to Kafka There I am having a field with Datatype as Date. So while performing the value which is greater than 1970 works fine. But if value is before 1970 it provide 00:00:00:Z May I know how to solve this problem? Does anyone have experienced this scenario?
How to use SMT to add header in my messages?
Hi everyone I am getting data from SFTP CSV Source connector. In this I need to add header with that message for consuming in Java side. So anyone knows how to add header in the message while using connector? Thanks in advance .
Problem while sending data
Hi all I am trying to send the data from Kafka Java producer in the format of Avro While trying to send data it is not sent. Before and after statement of send is executing correctly.But that sending alone is not working But it register the schema successfully.. No logs or error message is there !!! Other Kafka applications are working fine .. Does anyone have any idea on this?? 20:02:14.059 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 20:02:14.062 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer started 20:02:14.075 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 20:02:14.217 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions. 20:02:14.217 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1. 20:02:14.345 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -1: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], UNKNOWN(1): 0) 20:02:14.346 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) 20:02:14.357 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: ugUXR7FWR7uXIGWcpJYdLA 20:02:14.357 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='ugUXR7FWR7uXIGWcpJYdLA', nodes=[localhost:9092 (id: 1 rack: null)], partitions=[], controller=localhost:9092 (id: 1 rack: null)} *GOING TO INSERT* 20:02:14.386 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: 1 rack: null) for sending metadata request 20:02:14.386 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: 1 rack: null) using address localhost/127.0.0.1 20:02:14.389 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
Re: Schema validation problem
When I am checking Kafka logs it is there: Invalid record due to REST client error (io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator) io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: This ID is banned; error code: 40403 at io.confluent.kafka.schemaregistry.validator.LruSchemaRegistryClient.getSchemaByIdFromRegistry(LruSchemaRegistryClient.java:188) On Thu, Jul 16, 2020, 09:31 vishnu murali wrote: > Hi all > > My schema is: > > > > { > "fields": [ > > { > "name": "ID", > > "type": "long" > > }, > > { > "name": "Group", > > "type": [ > > "null", > > { > "avro.java.string": "String", > > "type": "string" > > } > > ] > > }, > > { > "name": "Key", > > "type": [ > > "null", > > { > "avro.java.string": "String", > > "type": "string" > > } > > ] > > }, > > { > "name": "Value", > > "type": [ > > "null", > > { > "avro.java.string": "String", > > "type": "string" > > } > > ] > > } > > ], > > "name": "Runconfig", > > "namespace": "com.ns.vishnu.kafkaavorest.Model", > > "type": "record" > > } > > > > > > My Data is: > > Runconfig rc = Runconfig.newBuilder() > .setID(1L) > .setGroup("new") > .setKey("year") > .setValue("11") > .build(); > > > > runTemlate.send(“n-run”,rc); > > > > > > Exception will be: org.apache.kafka.common.InvalidRecordException: This > record has failed the validation on broker and hence be rejected. > > > > > > What is the validation mistake in this ? > > I checked every datatype it is correct only!! > > Any suggestions on this !! >
Re: Kafka SFTP connector
Thanks a lot Ricardo I will try that On Wed, Jul 15, 2020, 19:11 Ricardo Ferreira wrote: > The `tls.private.key` type is indeed modeled as a password but for the > sake of how to assign values; it is just a string. Therefore, you can > provide any valid string to it regardless if it is long or not. > > Regarding escaping, I understand how this can be a PITA. I would recommend > either: > > 1. *Using Postman variables*: define a variable with the public key and > then reference in the payload using the `{{$var.name}}` notation. > > 2. *Use a Bastion Server and cURL*: you can use a bastion server to SSH > from your machine and then have access to the machine that hosts your Kafka > Connect server. While in there; you can use cURL to execute the POST > command along with the `--data-urlencode` parameter. > > Thanks, > > -- Ricardo > On 7/14/20 11:30 PM, vishnu murali wrote: > > Hi Ricardo > > Thanks for the response > > But the tls.private.key type was a password and I am giving request > through postman > > In this place how can we give that public key value in postman as a string > > That public key having so many characters which not included within that > string double quotes.. > > More escape sequence will be there in the public key > > In this situation do u know how can we use this?? > > On Wed, Jul 15, 2020, 02:06 Ricardo Ferreira > wrote: > >> Vishnu, >> >> A public key file can be specified via the property `tls.public.key`. >> Thanks, >> >> -- Ricardo >> On 7/14/20 6:09 AM, vishnu murali wrote: >> >> Hi all, >> >> I am using SFTP connector which that SFTP connection can be accessed by >> using public key file. >> >> How can I give this configuration in postman to start sftp connector? >> >> Anyone have any suggestions? >> >> >>
Schema validation problem
Hi all My schema is: { "fields": [ { "name": "ID", "type": "long" }, { "name": "Group", "type": [ "null", { "avro.java.string": "String", "type": "string" } ] }, { "name": "Key", "type": [ "null", { "avro.java.string": "String", "type": "string" } ] }, { "name": "Value", "type": [ "null", { "avro.java.string": "String", "type": "string" } ] } ], "name": "Runconfig", "namespace": "com.ns.vishnu.kafkaavorest.Model", "type": "record" } My Data is: Runconfig rc = Runconfig.newBuilder() .setID(1L) .setGroup("new") .setKey("year") .setValue("11") .build(); runTemlate.send(“n-run”,rc); Exception will be: org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence be rejected. What is the validation mistake in this ? I checked every datatype it is correct only!! Any suggestions on this !!
Namespace problem in schema registry
Hi all, I am having questions on namespace in schema registry If schema is automatically generate from JDBC source connector Means then the schema doesn't have namespace field and value But if we created schema manually with namespace and register into schema registry and if I try to run connector Schema not found Exception came.. How can we handle this and I need this namespace function for deserialize in Java?? Any have any solution for this?
Re: Kafka SFTP connector
Hi Ricardo Thanks for the response But the tls.private.key type was a password and I am giving request through postman In this place how can we give that public key value in postman as a string That public key having so many characters which not included within that string double quotes.. More escape sequence will be there in the public key In this situation do u know how can we use this?? On Wed, Jul 15, 2020, 02:06 Ricardo Ferreira wrote: > Vishnu, > > A public key file can be specified via the property `tls.public.key`. > Thanks, > > -- Ricardo > On 7/14/20 6:09 AM, vishnu murali wrote: > > Hi all, > > I am using SFTP connector which that SFTP connection can be accessed by > using public key file. > > How can I give this configuration in postman to start sftp connector? > > Anyone have any suggestions? > > >
Kafka SFTP connector
Hi all, I am using SFTP connector which that SFTP connection can be accessed by using public key file. How can I give this configuration in postman to start sftp connector? Anyone have any suggestions?
Schema validation in JDBC Connector
Hi Guys First I am creating a topic and set a schema first and then I am trying to take data from MySQL using JDBC source connector. In that time how can I validate the data from MySQL matches the schema I set in schema registry? Can any one have any idea about this??
Re: Problem in reading From JDBC SOURCE
Hi Below is the script I used to create table in Mysql CREATE TABLE `sample` ( `id` varchar(45) NOT NULL, `a` decimal(10,3) DEFAULT NULL, `b` decimal(10,3) DEFAULT NULL, `c` decimal(10,3) DEFAULT NULL, `d` decimal(10,3) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; Table data Id- 1 a- 0.002 b- 2.250 c- 0.789 d- 0.558 On Thu, Jul 2, 2020, 19:50 Ricardo Ferreira wrote: > Vishnu, > > I think is hard to troubleshoot things without the proper context. In > your case, could you please share an example of the rows contained in > the table `sample`? As well as its DDL? > > -- Ricardo > > On 7/2/20 9:29 AM, vishnu murali wrote: > > I go through that documentation > > > > Where it described like DECIMAL is not supported in MySQL like this . > > > > And also no example for MySQL so is there any other sample with MySQL > > > > > > > > On Thu, Jul 2, 2020, 18:49 Robin Moffatt wrote: > > > >> Check out this article where it covers decimal handling: > >> > >> > https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#bytes-decimals-numerics > >> > >> > >> -- > >> > >> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff > >> > >> > >> On Thu, 2 Jul 2020 at 13:54, vishnu murali > >> wrote: > >> > >>> Hi Guys, > >>> > >>> I am having some problem while reading from MySQL using JDBC source and > >>> received like below > >>> Anyone know what is the reason and how to solve this ? > >>> > >>> "a": "Aote", > >>> > >>>"b": "AmrU", > >>> > >>>"c": "AceM", > >>> > >>>"d": "Aote", > >>> > >>> > >>> Instead of > >>> > >>> "a": 0.002, > >>> > >>>"b": 0.465, > >>> > >>>"c": 0.545, > >>> > >>>"d": 0.100 > >>> > >>> > >>> It's my configuration > >>> > >>> > >>> { > >>> > >>> "name": "sample", > >>> > >>> "config": { > >>> > >>> "connector.class": > >> "io.confluent.connect.jdbc.JdbcSourceConnector", > >>> "connection.url": "jdbc:mysql://localhost:3306/sample", > >>> > >>> "connection.user": "", > >>> > >>> "connection.password": "xxx", > >>> > >>> "topic.prefix": "dample-", > >>> > >>> "poll.interval.ms": 360, > >>> > >>> "table.whitelist": "sample", > >>> > >>> "schemas.enable": "false", > >>> > >>> "mode": "bulk", > >>> > >>> "value.converter.schemas.enable": "false", > >>> > >>> "numeric.mapping": "best_fit", > >>> > >>> "value.converter": > "org.apache.kafka.connect.json.JsonConverter", > >>> > >>> "transforms": "createKey,extractInt", > >>> > >>> "transforms.createKey.type": > >>> "org.apache.kafka.connect.transforms.ValueToKey", > >>> > >>> "transforms.createKey.fields": "ID", > >>> > >>> "transforms.extractInt.type": > >>> "org.apache.kafka.connect.transforms.ExtractField$Key", > >>> > >>> "transforms.extractInt.field": "ID" > >>> > >>> } > >>> > >>> } > >>> >
Re: Problem in reading From JDBC SOURCE
I go through that documentation Where it described like DECIMAL is not supported in MySQL like this . And also no example for MySQL so is there any other sample with MySQL On Thu, Jul 2, 2020, 18:49 Robin Moffatt wrote: > Check out this article where it covers decimal handling: > > https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#bytes-decimals-numerics > > > -- > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff > > > On Thu, 2 Jul 2020 at 13:54, vishnu murali > wrote: > > > Hi Guys, > > > > I am having some problem while reading from MySQL using JDBC source and > > received like below > > Anyone know what is the reason and how to solve this ? > > > > "a": "Aote", > > > > "b": "AmrU", > > > > "c": "AceM", > > > > "d": "Aote", > > > > > > Instead of > > > > "a": 0.002, > > > > "b": 0.465, > > > > "c": 0.545, > > > > "d": 0.100 > > > > > > It's my configuration > > > > > > { > > > > "name": "sample", > > > > "config": { > > > > "connector.class": > "io.confluent.connect.jdbc.JdbcSourceConnector", > > > > "connection.url": "jdbc:mysql://localhost:3306/sample", > > > > "connection.user": "", > > > > "connection.password": "xxx", > > > > "topic.prefix": "dample-", > > > > "poll.interval.ms": 360, > > > > "table.whitelist": "sample", > > > > "schemas.enable": "false", > > > > "mode": "bulk", > > > > "value.converter.schemas.enable": "false", > > > > "numeric.mapping": "best_fit", > > > > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > > > > "transforms": "createKey,extractInt", > > > > "transforms.createKey.type": > > "org.apache.kafka.connect.transforms.ValueToKey", > > > > "transforms.createKey.fields": "ID", > > > > "transforms.extractInt.type": > > "org.apache.kafka.connect.transforms.ExtractField$Key", > > > > "transforms.extractInt.field": "ID" > > > > } > > > > } > > >
Problem in reading From JDBC SOURCE
Hi Guys, I am having some problem while reading from MySQL using JDBC source and received like below Anyone know what is the reason and how to solve this ? "a": "Aote", "b": "AmrU", "c": "AceM", "d": "Aote", Instead of "a": 0.002, "b": 0.465, "c": 0.545, "d": 0.100 It's my configuration { "name": "sample", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://localhost:3306/sample", "connection.user": "", "connection.password": "xxx", "topic.prefix": "dample-", "poll.interval.ms": 360, "table.whitelist": "sample", "schemas.enable": "false", "mode": "bulk", "value.converter.schemas.enable": "false", "numeric.mapping": "best_fit", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "transforms": "createKey,extractInt", "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields": "ID", "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractInt.field": "ID" } }
SFTP config Details as a JSON
Hey Guys, one of the value for SFTP configuration is key.schema. i am giving through postman as a json request So how can i give the schema details ,Because it is having DoubleQuotes For all key and value? could anyone explain? if i give like this this exception is coming. "key.schema": "{schema:{type:struct,fields:],optional:false,name:defaultkeyschemaname},payload:{}}", *Exception:* org.apache.kafka.common.config.ConfigException: Invalid value com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s' (code 115)): was expecting double-quote to start field name at [Source: (String)"{schema:{type:struct,fields:[],optional:false,name:defaultkeyschemaname},payload:{}}"; line: 1, column: 3] for configuration Could not read schema from 'key.schema'
SFTP config problem
Hey guys I changed the properties in SFTP CSV source and it is working fine.. Now I set the schema generation enabled true .so it is adding the schema data to every data into the topic .. So when I set that generation false it ask for key.schema and value.schema But both will be json and I didn't figured out how to give a Json as a value for json key in the SFTP configuration { "Schema":{ "type":"struct", "fields":[], "optional":false, "name":"abc" }, } How can I set this json to a value of key.schema in json configuration. Some typo or Punctuation error are coming
Re: Exception in SFTP CSV SOURCE
I forwarded wrongly I suppose.. So is there any solution for that above exception... On Tue, May 19, 2020, 22:35 Robin Moffatt wrote: > Hi Vishnu, > > I think there is a problem with your email client, it's just sent a > duplicate of each of your emails from yesterday? > > thanks, Robin. > > On Tue, 19 May 2020 at 16:44, vishnu murali > wrote: > > > Hi Guys > > > > By Trying SFTP CSV SOURCE i am getting this exception by using this > > configuration. > > > > > > what is the issue and how to resolve it? > > > > can anyone know? > > > > *Config:* > > { > > "name": "CsvSFTP1", > > "config": { > > "tasks.max": "1", > > "connector.class": > > "io.confluent.connect.sftp.SftpCsvSourceConnector", > > "cleanup.policy": "NONE", > > "behavior.on.error": "IGNORE", > > "input.path": "/mnt/c/Users/vmuralidharan/Desktop", > > "error.path": "/mnt/c/Users/vmuralidharan/Desktop/error", > > "finished.path": "/mnt/c/Users/vmuralidharan/Desktop/finished", > > "input.file.pattern": "orm.csv", > > "kafka.topic": "sftp-testing-topic", > > "csv.first.row.as.header": "true", > > "schema.generation.enabled": "false" > > } > > } > > > > *Exception StackTrace:* > > org.apache.kafka.connect.errors.ConnectException: Can not get connection > to > > SFTP: > > at > > > > > io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:115) > > at > > > > > io.confluent.connect.sftp.connection.SftpConnection.(SftpConnection.java:66) > > at > > > > > io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.getSftpConnection(AbstractSftpSourceConnectorConfig.java:557) > > at > > > > > io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.(AbstractSftpSourceConnectorConfig.java:242) > > at > > > > > io.confluent.connect.sftp.source.SftpSourceConnectorConfig.(SftpSourceConnectorConfig.java:101) > > at > > > > > io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.(SftpCsvSourceConnectorConfig.java:157) > > at > > > > > io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:47) > > at > > > > > org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111) > > at > > > > > org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136) > > at > > > > > org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196) > > at > > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:260) > > at > > > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183) > > at > > > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125) > > at > > > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199) > > at > > > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195) > > at > > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > > at > > > > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > at > > > > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > at java.base/java.lang.Thread.run(Thread.java:834) > > Caused by: com.jcraft.jsch.JSchException: java.net.ConnectException: > > Connection refused (Connection refused) > > at com.jcraft.jsch.Util.createSocket(Util.java:394) > > at com.jcraft.jsch.Session.connect(Session.java:215) > > at com.jcraft.jsch.Session.connect(Session.java:183) > > at > > > > > io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:109) > > ... 18 more > > Caused by: java.net.ConnectException: Connection refused (Connection > > refused) > > at java.base/java.net.PlainSocketImpl.socketConnect(Native > Method) > > at > > java.base/java.net > > .AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399) > > at > > java.base/java.net > > > .AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242) > > at > > java.base/java.net > > .AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224) > > at > > java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403) > > at java.base/java.net.Socket.connect(Socket.java:609) > > at java.base/java.net.Socket.connect(Socket.java:558) > > at java.base/java.net.Socket.(Socket.java:454) > > at java.base/java.net.Socket.(Socket.java:231) > > at com.jcraft.jsch.Util$1.run(Util.java:362) > > >
Fwd: Exception in SFTP CSV SOURCE
Hi Guys By Trying SFTP CSV SOURCE i am getting this exception by using this configuration. what is the issue and how to resolve it? can anyone know? *Config:* { "name": "CsvSFTP1", "config": { "tasks.max": "1", "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector", "cleanup.policy": "NONE", "behavior.on.error": "IGNORE", "input.path": "/mnt/c/Users/vmuralidharan/Desktop", "error.path": "/mnt/c/Users/vmuralidharan/Desktop/error", "finished.path": "/mnt/c/Users/vmuralidharan/Desktop/finished", "input.file.pattern": "orm.csv", "kafka.topic": "sftp-testing-topic", "csv.first.row.as.header": "true", "schema.generation.enabled": "false" } } *Exception StackTrace:* org.apache.kafka.connect.errors.ConnectException: Can not get connection to SFTP: at io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:115) at io.confluent.connect.sftp.connection.SftpConnection.(SftpConnection.java:66) at io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.getSftpConnection(AbstractSftpSourceConnectorConfig.java:557) at io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.(AbstractSftpSourceConnectorConfig.java:242) at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.(SftpSourceConnectorConfig.java:101) at io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.(SftpCsvSourceConnectorConfig.java:157) at io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:47) at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111) at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136) at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196) at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:260) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: com.jcraft.jsch.JSchException: java.net.ConnectException: Connection refused (Connection refused) at com.jcraft.jsch.Util.createSocket(Util.java:394) at com.jcraft.jsch.Session.connect(Session.java:215) at com.jcraft.jsch.Session.connect(Session.java:183) at io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:109) ... 18 more Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399) at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242) at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224) at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403) at java.base/java.net.Socket.connect(Socket.java:609) at java.base/java.net.Socket.connect(Socket.java:558) at java.base/java.net.Socket.(Socket.java:454) at java.base/java.net.Socket.(Socket.java:231) at com.jcraft.jsch.Util$1.run(Util.java:362)
SFTP Connector
Hey Guys, Now i am trying to implement SFTP connector by using this configurations. i am using windows system. so i am having doubts like how to set a path? i tried to set like this in *config *as well as */mnt/c/users/vmuralidharan* but it doesn't work. so what i need to do ? { "name": "CsvSFTP11", "config": { "tasks.max": "1", "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector", "cleanup.policy": "NONE", "behavior.on.error": "IGNORE", "input.path": " C:\\Users\\vmuralidharan", "error.path": "C:\\Users\\vmuralidharan\\Desktop\\error", "finished.path": "C:\\Users\\vmuralidharan\\Desktop\\finished", "input.file.pattern": "purchase.csv", "kafka.topic": "sftp-testing-topic", "csv.first.row.as.header": "true", "schema.generation.enabled": "false" } } How to get solution for this?
Exception in SFTP CSV SOURCE
Hi Guys By Trying SFTP CSV SOURCE i am getting this exception by using this configuration. what is the issue and how to resolve it? can anyone know? *Config:* { "name": "CsvSFTP1", "config": { "tasks.max": "1", "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector", "cleanup.policy": "NONE", "behavior.on.error": "IGNORE", "input.path": "/mnt/c/Users/vmuralidharan/Desktop", "error.path": "/mnt/c/Users/vmuralidharan/Desktop/error", "finished.path": "/mnt/c/Users/vmuralidharan/Desktop/finished", "input.file.pattern": "orm.csv", "kafka.topic": "sftp-testing-topic", "csv.first.row.as.header": "true", "schema.generation.enabled": "false" } } *Exception StackTrace:* org.apache.kafka.connect.errors.ConnectException: Can not get connection to SFTP: at io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:115) at io.confluent.connect.sftp.connection.SftpConnection.(SftpConnection.java:66) at io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.getSftpConnection(AbstractSftpSourceConnectorConfig.java:557) at io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.(AbstractSftpSourceConnectorConfig.java:242) at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.(SftpSourceConnectorConfig.java:101) at io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.(SftpCsvSourceConnectorConfig.java:157) at io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:47) at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111) at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136) at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196) at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:260) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: com.jcraft.jsch.JSchException: java.net.ConnectException: Connection refused (Connection refused) at com.jcraft.jsch.Util.createSocket(Util.java:394) at com.jcraft.jsch.Session.connect(Session.java:215) at com.jcraft.jsch.Session.connect(Session.java:183) at io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:109) ... 18 more Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399) at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242) at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224) at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403) at java.base/java.net.Socket.connect(Socket.java:609) at java.base/java.net.Socket.connect(Socket.java:558) at java.base/java.net.Socket.(Socket.java:454) at java.base/java.net.Socket.(Socket.java:231) at com.jcraft.jsch.Util$1.run(Util.java:362)
SFTP Connector
Hey Guys, Now i am trying to implement SFTP connector by using this configurations. i am using windows system. so i am having doubts like how to set a path? i tried to set like this in *config *as well as */mnt/c/users/vmuralidharan* but it doesn't work. so what i need to do ? { "name": "CsvSFTP11", "config": { "tasks.max": "1", "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector", "cleanup.policy": "NONE", "behavior.on.error": "IGNORE", "input.path": " C:\\Users\\vmuralidharan", "error.path": "C:\\Users\\vmuralidharan\\Desktop\\error", "finished.path": "C:\\Users\\vmuralidharan\\Desktop\\finished", "input.file.pattern": "purchase.csv", "kafka.topic": "sftp-testing-topic", "csv.first.row.as.header": "true", "schema.generation.enabled": "false" } } How to get solution for this?
Problem in Docker
Hi i am running cp-all-in one docker for the confluent kafka There i am trying that JDBCSourceConnector. it is showing this results.. { "error_code": 400, "message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/sample for configuration Couldn't open connection to jdbc:mysql://localhost:3306/sample\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/sample for configuration Couldn't open connection to jdbc:mysql://localhost:3306/sample\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`" } How can i add external jar file in that docker? Because i am new to docker and i am not even know it is possible or not!!! can anyone know thie solution?
Re: JDBC source connector
Thanks Liam But I am asking like assume I am having 10. Using JDBC source I need to push that once.. No more additional data will be added in future in that table. In that case i need to push that only once not more than one... For this scenario I am asking!! On Thu, May 14, 2020, 19:20 Liam Clarke-Hutchinson < liam.cla...@adscale.co.nz> wrote: > Why not use autoincrement? It'll only emit new records on subsequent polls > then. > > On Thu, 14 May 2020, 11:15 pm vishnu murali, > wrote: > > > Hi Guys, > > > > I am using the mode *bulk *and poll.interval.ms *10* in the Source > > connector configuration. > > > > But I don't need to load data another time.? > > > > I need to load the data only once ?? > > > > How can I able to do this ? > > >
JDBC source connector
Hi Guys, I am using the mode *bulk *and poll.interval.ms *10* in the Source connector configuration. But I don't need to load data another time.? I need to load the data only once ?? How can I able to do this ?
Jdbc Source Connector Config
Hi Guys, i am having a question. "transforms":"createKey,extractInt", "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields":"id", "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractInt.field":"id" what is the need of these configuration in JdbcSourceConnector? And without these can we use SourceConnector?
Re: Custom Connector
Thanks Tom I am receiving data from one Rest Endpoint and post that data from endpoint to topic. Is it possible or any other connector available for that? On Tue, May 12, 2020, 13:24 Tom Bentley wrote: > Hi Vishnu, > > These are probably a good place to start: > 1. https://docs.confluent.io/current/connect/devguide.html > 2. > > https://www.confluent.io/blog/create-dynamic-kafka-connect-source-connectors/ > > Cheers, > > Tom > > > On Tue, May 12, 2020 at 7:34 AM vishnu murali > wrote: > > > Hi Guys, > > > > i am trying to create a new connector for my own purpose. > > > > Is there any guide or document which show how to create a own connector > and > > use? > > >
Need a connector to listening Rest Endpoint
Hi friends, I am having Rest Endpoint and data is receiving in that endpoint continuously.. I need to send that data to the Kafka topic .. For these above scenarios I need to solve using connector.. Because I didn't want to run another application to receive data from rest and send to Kafka. Instead of seperate application to run and push the data into topic through Rest service is there any connector available to listen that end point and automatically push into a topic??
Custom Connector
Hi Guys, i am trying to create a new connector for my own purpose. Is there any guide or document which show how to create a own connector and use?
Re: JDBC SINK SCHEMA
Hi Robin Is it possible to integrate Apache Kafka with that confluent schema registry like u said ?? I don't know how to do,can u able to give any reference? On Mon, May 11, 2020, 14:09 Robin Moffatt wrote: > You can use Apache Kafka as you are currently using, and just deploy Schema > Registry alongside it. > > > -- > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff > > > On Sat, 9 May 2020 at 02:16, Chris Toomey wrote: > > > You have to either 1) use one of the Confluent serializers > > < > > > https://docs.confluent.io/current/schema-registry/serdes-develop/index.html# > > > > > when you publish to the topic, so that the schema (or reference to it) is > > included, or 2) write and use a custom converter > > < > > > https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/storage/Converter.html > > > > > that knows about the data schema and can take the kafka record value and > > convert it into a kafka connect record (by implementing the toConnectData > > converter method), which is what the sink connectors are driven from. > > > > See https://docs.confluent.io/current/connect/concepts.html#converters > > > > Chris > > > > > > > > On Fri, May 8, 2020 at 6:59 AM vishnu murali > > > wrote: > > > > > Hey Guys, > > > > > > I am *using Apache **2.5 *not confluent. > > > > > > i am trying to send data from topic to database using jdbc sink > > connector. > > > > > > we need to send that data with the appropriate schema also. > > > > > > i am *not using confluent version* of kafka. > > > > > > so can anyone explain how can i do this ? > > > > > >
JDBC SINK SCHEMA
Hey Guys, I am *using Apache **2.5 *not confluent. i am trying to send data from topic to database using jdbc sink connector. we need to send that data with the appropriate schema also. i am *not using confluent version* of kafka. so can anyone explain how can i do this ?
Re: JDBC Sink Connector
Thank you so much Robin It helped me a lot to define sink connector with upsert mode and it is very helpful. For that schema related question i am not getting proper understanding. Because i am using Normal Apache kafka,i don't know whether those schema registry ,kql,avro serializers are present or not in Apache kafka (2.5) I Suppose these Schema Registry and ksql services are coming in the confluent version of Kafka. On Thu, May 7, 2020 at 1:47 PM Robin Moffatt wrote: > If you don't want to send the schema each time then serialise your data > using Avro (or Protobuf), and then the schema is held in the Schema > Registry. See https://www.youtube.com/watch?v=b-3qN_tlYR4=981s > > If you want to update a record insert of insert, you can use the upsert > mode. See https://www.youtube.com/watch?v=b-3qN_tlYR4=627s > > > -- > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff > > > On Thu, 7 May 2020 at 06:48, vishnu murali > wrote: > > > Hey Guys, > > > > i am working on JDBC Sink Conneector to take data from kafka topic to > > mysql. > > > > i am having 2 questions. > > > > i am using normal Apache Kafka 2.5 not a confluent version. > > > > 1)For inserting data every time we need to add the schema data also with > > every data,How can i overcome this situation?i want to give only the > data. > > > > 2)In certain time i need to update the existing record without adding as > a > > new record.How can i achieve this? > > >
Re: Kafka consumer
Thanks Chris But it won't work,I tried that also. I found solution That @KafkaListener default behavior it self is to take one by one data only.. On Thu, May 7, 2020, 11:28 Chris Toomey wrote: > You can set the max.poll.records config. setting to 1 in order to pull down > and process 1 record at a time. > > See https://kafka.apache.org/documentation/#consumerconfigs . > > On Mon, May 4, 2020 at 1:04 AM vishnu murali > wrote: > > > Hey Guys, > > > > I am having a topic and in that topic I am having 3000 messages > > > > In my springboot application I want to consume the data using > > @KafkaListener() and also one by one because I need to do some tedious > > process on that Data it may take some time > > > > So within this time I don't need to consume another data. > > > > After the process is finished only I need to consume the data from the > > topic .? > > > > How can I do this? > > > > Any ideas? > > >
JDBC Sink Connector
Hey Guys, i am working on JDBC Sink Conneector to take data from kafka topic to mysql. i am having 2 questions. i am using normal Apache Kafka 2.5 not a confluent version. 1)For inserting data every time we need to add the schema data also with every data,How can i overcome this situation?i want to give only the data. 2)In certain time i need to update the existing record without adding as a new record.How can i achieve this?
Need to consume one by one data
Hi guys I am having Kafka topic of Films having 2000 data. In spring boot KafkaListener I am listening that particular topic But I need to process the every data one at a time after that only I need to consume the next record.. How can I overcome this scenario? Any idea for this scenario?
Connect-Mirror Error
Hey Guys, Here By i am posting stack trace occured in the connect-distributed while giving mirror connector configurations: *Post*:http://localhost:8083/connectors *Request json Body:* { "name": "us-west-sourc", "config": { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "cluster 9092", "target.cluster.alias": "cluster 9091", "source.cluster.bootstrap.servers": "localhost:9092", "topics": "vis-city" } } it is saying as bootstrap.servers which has default value is missing? even if i tried that config in request also it is responding the same... what may be the problem? Actually i am trying to copy data from topic in one cluster to another cluster.. 1)By using MirrorMaker command it is possible. But i want to do the same using connector by giving request and then copy the data? What change i need to do? [2020-05-02 20:40:43,304] ERROR WorkerConnector{id=us-west-sourc} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector) org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:477) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:142) at org.apache.kafka.clients.admin.AdminClientConfig.(AdminClientConfig.java:216) at org.apache.kafka.clients.admin.Admin.create(Admin.java:71) at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49) at org.apache.kafka.connect.mirror.MirrorSourceConnector.start(MirrorSourceConnector.java:115) at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110) at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135) at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195) at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:257) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1190) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:126) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1206) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1202) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:830)
Re: Connect-Mirror 2.5.0
Hi Liam, Thanks for the reply, In that I want that connectors type of execution.. Here PUT /connectors/us-west-source/config HTTP/1.1 { "name": "us-west-source", "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "us-west", "target.cluster.alias": "us-east", "source.cluster.bootstrap.servers": "us-west-host1:9091", "topics": ".*" } Source I can able to mention in source.cluster.bootstrap.servers So where I need to config the Target cluster bootstrap server address ? For example source is localhost:9092 Target is localhost:8092 So where I need to mention Target On Sat, May 2, 2020, 19:13 Liam Clarke-Hutchinson wrote: > Hi Vishnu, > > As per my earlier email: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-Walkthrough:RunningMirrorMaker2.0 > > In the same vein, any questions, hit me up, > > Liam Clarke-Hutchinson > > On Sat, May 2, 2020 at 9:56 PM vishnu murali > wrote: > > > Hey Guys > > > > I am using Apache version of 2.5 > > > > Correct me if I am wrong!! > > > > Here there is a jar file called Connect-Mirror2.5.0 in the libs folder.I > > think it is a connector to copy the topic data between one cluster to > > another cluster like MirrorMaker.. > > > > So I started zookeeper > > I started Kafka server > > I started connect-distributed > > > > > > So what are the json configurations to give in the Post Request to make > > that connector work.. > > > > How can I mention source ,destination clusters whitelist topics in that > > configuration file and process? > > >
Connect-Mirror 2.5.0
Hey Guys I am using Apache version of 2.5 Correct me if I am wrong!! Here there is a jar file called Connect-Mirror2.5.0 in the libs folder.I think it is a connector to copy the topic data between one cluster to another cluster like MirrorMaker.. So I started zookeeper I started Kafka server I started connect-distributed So what are the json configurations to give in the Post Request to make that connector work.. How can I mention source ,destination clusters whitelist topics in that configuration file and process?
Re: Connector For MirrorMaker
Hi Robin I am using Apache Kafka there is service called kafka-mirror-maker.bat with the consumer and producer properties to copy topic from one cluster to another. I want to do that by using connector.. I didn't aware anything about MirrorMaker 2 and I didn't know how to download and configure with Apache Kafka.. Can u guide me how to start with Mirror Maker 2 connector ? On Fri, May 1, 2020, 19:13 Robin Moffatt wrote: > Are you talking about MirrorMaker 2? That runs as a connector. > > If not, perhaps you can clarify your question a bit as to what it is you > are looking for. > > > -- > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff > > > On Fri, 1 May 2020 at 13:57, vishnu murali > wrote: > > > Hi Guys > > > > Previously I asked question about the Mirror maker and it is solved now. > > > > So Now I need to know is there any connectors available for that same. > > > > Like JdbcConnector acts as a source and sink for DB connection is there > any > > connector available for performing mirror operations > > > > or > > > > does some one having own created connectors for this purpose?? > > >
Connector For MirrorMaker
Hi Guys Previously I asked question about the Mirror maker and it is solved now. So Now I need to know is there any connectors available for that same. Like JdbcConnector acts as a source and sink for DB connection is there any connector available for performing mirror operations or does some one having own created connectors for this purpose??
Cant Able to start Kafka MirrorMaker
Hey Guys, I am trying to move data between one cluster to another cluster *Source* *Destination* *Zookeeper* 2181 2182 *Kafka* 9092 9091 *ConsumerProperties:* bootstrap.servers=localhost:9092 group.id=test-consumer-group auto.offset.rest=earliest *Producer Properties:* bootstrap.servers=localhost:9091 compression.type=none i am having topic in 9092 as actor which is from MySQL Sakila Schema actor table In the 9091 i don't have any topic ,so i try to migrate data from 9092 ->9091 it is showing like D:\kafka>.\bin\windows\kafka-mirror-maker.bat --consumer.config .\config\consumer.properties --producer.config .\config\producer.properties --whitelist actor WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor' [2020-05-01 12:04:55,024] WARN The configuration 'auto.offset.rest' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) *it's stucked in that place itself no more operations is performed* and the topic is not copied into 9091 i dont know why can anyone able to identify and explain me a way to do it please?
Re: Kafka Mirror Maker 2
Hi Himanshu Can u pls tell how to use MM2.. I am using Apache Kafka,in this normal mirror maker is only available.. Most of the people saying like to use MM2 but I didn't able to know where to get that MM2. Is it related to Apache or from some other Distributors? Can u pls explain how to install that version On Thu, Apr 30, 2020, 01:35 Himanshu Tyagi wrote: > Hey Team, > I've a few doubts regarding how the producers work after failover in Mirror > Maker 2 > > Let's say that we have two clusters K1 and K2 and configured MM2 > replication for TOPIC1 (originally created in just K1). > > We configured the active-active replication: > > K1->K2.enabled = true > K2->K1.enabled = true > K1->K2.topics = .* > K2->K1.topics = .* > > On starting mirror maker 2, I see that topics are replicated from cluster > K1 > to K2 in the naming format K1.topic_name_here and vice versa for topics > from cluster K2 to K1. > > I see that there was no topic TOPIC1 created in K2, only K1.TOPIC1 was > created. I see this scenario working for consumers, as in the beginning the > consumers consume TOPIC1 from cluster K1. When the cluster K1 out of > service, fail over happens. Consumers start to consume K1.TOPIC1 from K2. > > My questions are as follows: > >1. For producers, they won't producer to the topic K1.TOPIC1 in cluster >K2, my question is how do the producers go about producing data. Do I > need >to manually create a topic TOPIC1 in cluster K2 and this topic will be >used for producing messages to when failover happens. >2. If the above scenario is true, how do I move back to my primary >cluster K1. As, now the topic TOPIC1 in cluster K2 has digressed from >the topic TOPIC in K1. How do we sync the messages in this scenario ? > > > -- > Best, > Himanshu Tyagi > Contact No.- +1 480 465 0625 >
Re: Apache Kafka cluster to cluster
i start the kafka mirror using the below configuration .\bin\windows\kafka-mirror-maker.bat --consumer.config .\config\consumer.properties --producer.config .\config\producer.properties --whitelist=".*" *Consumer.properties:* # format: host1:port1,host2:port2 ... bootstrap.servers=localhost:9092 # consumer group id group.id=test-consumer-group *Producer.properties:* # format: host1:port1,host2:port2 ... bootstrap.servers=localhost:9091 # specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd compression.type=none I get this kind of warning statement and it will be still present in that statement of more than 30 minutes.No data will be transfered WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor' can anyone clarify why i am getting this and what i am doing wrong? On Thu, Apr 30, 2020 at 12:22 AM vishnu murali wrote: > Thanks Blake.. > > More over can we use any connector types for this situation? > > Like source is a topic and sink is also an another topic > > Is this possible... > > On Thu, Apr 30, 2020, 00:19 Blake Miller wrote: > >> Hi Vishnu, >> >> Check out MirrorMaker >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330 >> >> This can do what you want. Note that the offsets are not copied, nor are >> the message timestamps. >> >> HTH >> >> >> On Wed, Apr 29, 2020 at 6:47 PM vishnu murali > > >> wrote: >> >> > Hi Guys, >> > >> > I am having two separate Kafka cluster running in two independent >> zookeeper >> > >> > I need to send a set of data from one topic from cluster A to cluster B >> > with the same topic name with all data also.. >> > >> > How can I achieve this >> > >> > Done anyone have any idea ?? >> > >> >
Re: JDBC source connector to Kafka topic
I am using normal Apache Kafka only not confluent So step by step only I am starting Kafka .\bin\windows\kafka-server-start.bat .\config\server.properties Like this only I am starting and even before shutdown also I need to send configuration details every time through post request to get the data from DB. On Thu, Apr 30, 2020, 02:04 Robin Moffatt wrote: > How are you running Kafka? Do you mean when you shut it down you have to > reconfigure the connector? > > > -- > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff > > > On Wed, 29 Apr 2020 at 20:03, vishnu murali > wrote: > > > Hi guys > > I am trying that JDBC source connector to get data from MySQL and send > as a > > data in a topic,so here what I am facing is there is more manual here > > > > After starting zookeeper,server, connect-distributed in Apache kafka > > > > I need to give Post request every time to the localhost:8083/connectors > > with the request body of config details when I need data and also all > data > > will come again and again.. > > > > Is there any way to achieve this CDC? > > >
JDBC source connector to Kafka topic
Hi guys I am trying that JDBC source connector to get data from MySQL and send as a data in a topic,so here what I am facing is there is more manual here After starting zookeeper,server, connect-distributed in Apache kafka I need to give Post request every time to the localhost:8083/connectors with the request body of config details when I need data and also all data will come again and again.. Is there any way to achieve this CDC?
One cluster topic to another cluster topic
Hi Guys, I am having two separate Kafka cluster running in two independent zookeeper I need to send a set of data from one topic from cluster A to cluster B with the same topic name with all data also.. How can I achieve this Done anyone have any idea ??
Re: Apache Kafka cluster to cluster
Thanks Blake.. More over can we use any connector types for this situation? Like source is a topic and sink is also an another topic Is this possible... On Thu, Apr 30, 2020, 00:19 Blake Miller wrote: > Hi Vishnu, > > Check out MirrorMaker > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330 > > This can do what you want. Note that the offsets are not copied, nor are > the message timestamps. > > HTH > > > On Wed, Apr 29, 2020 at 6:47 PM vishnu murali > wrote: > > > Hi Guys, > > > > I am having two separate Kafka cluster running in two independent > zookeeper > > > > I need to send a set of data from one topic from cluster A to cluster B > > with the same topic name with all data also.. > > > > How can I achieve this > > > > Done anyone have any idea ?? > > >
Apache Kafka cluster to cluster
Hi Guys, I am having two separate Kafka cluster running in two independent zookeeper I need to send a set of data from one topic from cluster A to cluster B with the same topic name with all data also.. How can I achieve this Done anyone have any idea ??