Jdbc Source Connector Holding more active sessions

2021-04-19 Thread vishnu murali
Hi everyone

we are using Kafka JDBC connector to connect to Redshift database

we created three connectors to pull the data from three different table to
three different topic

we observed that each connector holding multiple active sessions

even if we delete the connectors, active sessions are still available and
not closing

Did anyone faced same kind of issues?

is there any configuration is available to restrict this kind of behaviuor?

can anyone know how to overcome this issues?


How to get Date format value before 1970 in Kafka connect

2020-10-23 Thread vishnu murali
Hi guys

I am using JDBC SOURCE CONNECTOR to take data from AWS Redshift to Kafka

There I am having a field with Datatype as Date.

So while performing the value which is greater than 1970 works fine.

But if value is before 1970 it provide 00:00:00:Z

May I know how to solve this problem?

Does anyone have experienced this scenario?


How to use SMT to add header in my messages?

2020-08-03 Thread vishnu murali
Hi everyone

I am getting data from SFTP CSV Source connector.

In this I need to add header with that message for consuming in Java side.

So anyone knows how to add header in the message while using connector?

Thanks in advance .


Problem while sending data

2020-07-20 Thread vishnu murali
Hi all

I am trying to send the data from Kafka Java producer in the format of Avro

While trying to  send  data it is not sent.

Before and after statement of send is executing correctly.But that sending
alone is not working

But it register the schema successfully..


No logs or error message is there !!!

Other Kafka applications are working fine ..

Does anyone have any idea on this??

20:02:14.059 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Initiating connection to node localhost:9092 (id: -1 rack: null) using
address localhost/127.0.0.1

20:02:14.062 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer -
[Producer clientId=producer-1] Kafka producer started

20:02:14.075 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.common.network.Selector - [Producer clientId=producer-1]
Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0
to node -1

20:02:14.217 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Completed connection to node -1. Fetching API versions.

20:02:14.217 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Initiating API versions fetch from node -1.

20:02:14.345 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Recorded API versions for node -1: (Produce(0): 0 to 8 [usable: 8],
Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5],
Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4],
StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6],
ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable:
8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3
[usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4
[usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5
[usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3
[usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3
[usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4
[usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0
to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3],
AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1
[usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0
[usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0
to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to
2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0
to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2
[usable: 1], CreatePartitions(37): 0 to 2 [usable: 1],
CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0
to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1],
DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2
[usable: 2], ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable:
0], UNKNOWN(1): 0)

20:02:14.346 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Sending metadata request MetadataRequestData(topics=[],
allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false,
includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1
rack: null)

20:02:14.357 [kafka-producer-network-thread | producer-1] INFO
org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster
ID: ugUXR7FWR7uXIGWcpJYdLA

20:02:14.357 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Updated
cluster metadata updateVersion 2 to
MetadataCache{clusterId='ugUXR7FWR7uXIGWcpJYdLA', nodes=[localhost:9092
(id: 1 rack: null)], partitions=[], controller=localhost:9092 (id: 1 rack:
null)}


*GOING TO INSERT*

20:02:14.386 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Initialize connection to node localhost:9092 (id: 1 rack: null) for sending
metadata request

20:02:14.386 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Initiating connection to node localhost:9092 (id: 1 rack: null) using
address localhost/127.0.0.1

20:02:14.389 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.common.network.Selector - [Producer clientId=producer-1]
Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0
to node 1


Re: Schema validation problem

2020-07-15 Thread vishnu murali
When I am checking Kafka logs it is there:


Invalid record due to REST client error
(io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator)

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
This ID is banned; error code: 40403

at
io.confluent.kafka.schemaregistry.validator.LruSchemaRegistryClient.getSchemaByIdFromRegistry(LruSchemaRegistryClient.java:188)

On Thu, Jul 16, 2020, 09:31 vishnu murali 
wrote:

> Hi all
>
> My schema is:
>
>
>
> {
>   "fields": [
>
> {
>   "name": "ID",
>
>   "type": "long"
>
> },
>
> {
>   "name": "Group",
>
>   "type": [
>
> "null",
>
> {
>   "avro.java.string": "String",
>
>   "type": "string"
>
> }
>
>   ]
>
> },
>
> {
>   "name": "Key",
>
>   "type": [
>
> "null",
>
> {
>   "avro.java.string": "String",
>
>   "type": "string"
>
> }
>
>   ]
>
> },
>
> {
>   "name": "Value",
>
>   "type": [
>
> "null",
>
> {
>   "avro.java.string": "String",
>
>   "type": "string"
>
> }
>
>   ]
>
> }
>
>   ],
>
>   "name": "Runconfig",
>
>   "namespace": "com.ns.vishnu.kafkaavorest.Model",
>
>   "type": "record"
>
> }
>
>
>
>
>
> My Data is:
>
> Runconfig rc = Runconfig.newBuilder()
> .setID(1L)
> .setGroup("new")
> .setKey("year")
> .setValue("11")
> .build();
>
>
>
> runTemlate.send(“n-run”,rc);
>
>
>
>
>
> Exception will be: org.apache.kafka.common.InvalidRecordException: This
> record has failed the validation on broker and hence be rejected.
>
>
>
>
>
> What is the validation mistake in this ?
>
> I checked  every datatype it is correct only!!
>
> Any suggestions on this !!
>


Re: Kafka SFTP connector

2020-07-15 Thread vishnu murali
Thanks a lot Ricardo

I will try that 

On Wed, Jul 15, 2020, 19:11 Ricardo Ferreira  wrote:

> The `tls.private.key` type is indeed modeled as a password but for the
> sake of how to assign values; it is just a string. Therefore, you can
> provide any valid string to it regardless if it is long or not.
>
> Regarding escaping, I understand how this can be a PITA. I would recommend
> either:
>
> 1. *Using Postman variables*: define a variable with the public key and
> then reference in the payload using the `{{$var.name}}` notation.
>
> 2. *Use a Bastion Server and cURL*: you can use a bastion server to SSH
> from your machine and then have access to the machine that hosts your Kafka
> Connect server. While in there; you can use cURL to execute the POST
> command along with the `--data-urlencode` parameter.
>
> Thanks,
>
> -- Ricardo
> On 7/14/20 11:30 PM, vishnu murali wrote:
>
> Hi Ricardo
>
> Thanks for the response
>
> But the tls.private.key type was a password and I am giving request
> through postman
>
> In this place how can we give that public key value in postman as a string
>
> That public key having so many characters which not included within that
> string double quotes..
>
> More escape sequence will be there in the public key
>
> In this situation do u know how can we use this??
>
> On Wed, Jul 15, 2020, 02:06 Ricardo Ferreira 
> wrote:
>
>> Vishnu,
>>
>> A public key file can be specified via the property `tls.public.key`.
>> Thanks,
>>
>> -- Ricardo
>> On 7/14/20 6:09 AM, vishnu murali wrote:
>>
>> Hi all,
>>
>> I am using SFTP connector which that SFTP connection can be accessed by
>> using public key file.
>>
>> How can I give this configuration in postman to start sftp connector?
>>
>> Anyone have any suggestions?
>>
>>
>>


Schema validation problem

2020-07-15 Thread vishnu murali
Hi all

My schema is:



{
  "fields": [

{
  "name": "ID",

  "type": "long"

},

{
  "name": "Group",

  "type": [

"null",

{
  "avro.java.string": "String",

  "type": "string"

}

  ]

},

{
  "name": "Key",

  "type": [

"null",

{
  "avro.java.string": "String",

  "type": "string"

}

  ]

},

{
  "name": "Value",

  "type": [

"null",

{
  "avro.java.string": "String",

  "type": "string"

}

  ]

}

  ],

  "name": "Runconfig",

  "namespace": "com.ns.vishnu.kafkaavorest.Model",

  "type": "record"

}





My Data is:

Runconfig rc = Runconfig.newBuilder()
.setID(1L)
.setGroup("new")
.setKey("year")
.setValue("11")
.build();



runTemlate.send(“n-run”,rc);





Exception will be: org.apache.kafka.common.InvalidRecordException: This
record has failed the validation on broker and hence be rejected.





What is the validation mistake in this ?

I checked  every datatype it is correct only!!

Any suggestions on this !!


Namespace problem in schema registry

2020-07-14 Thread vishnu murali
Hi all,

I am having questions on namespace in schema registry

If schema is automatically generate from JDBC source connector Means then
the schema doesn't have namespace field and value

But if we created schema manually with namespace and register into schema
registry and if I try to run connector

Schema not found Exception came..

How can we handle this and I need this namespace function for  deserialize
in Java??

Any have any solution for this?


Re: Kafka SFTP connector

2020-07-14 Thread vishnu murali
Hi Ricardo

Thanks for the response

But the tls.private.key type was a password and I am giving request through
postman

In this place how can we give that public key value in postman as a string

That public key having so many characters which not included within that
string double quotes..

More escape sequence will be there in the public key

In this situation do u know how can we use this??

On Wed, Jul 15, 2020, 02:06 Ricardo Ferreira  wrote:

> Vishnu,
>
> A public key file can be specified via the property `tls.public.key`.
> Thanks,
>
> -- Ricardo
> On 7/14/20 6:09 AM, vishnu murali wrote:
>
> Hi all,
>
> I am using SFTP connector which that SFTP connection can be accessed by
> using public key file.
>
> How can I give this configuration in postman to start sftp connector?
>
> Anyone have any suggestions?
>
>
>


Kafka SFTP connector

2020-07-14 Thread vishnu murali
Hi all,

I am using SFTP connector which that SFTP connection can be accessed by
using public key file.

How can I give this configuration in postman to start sftp connector?

Anyone have any suggestions?


Schema validation in JDBC Connector

2020-07-10 Thread vishnu murali
Hi Guys

First  I am creating a topic and set a schema first and then I am trying to
take data from MySQL using JDBC source connector.

In that time how can I validate the data from MySQL matches the schema I
set in  schema registry?

Can any one have any idea about this??


Re: Problem in reading From JDBC SOURCE

2020-07-03 Thread vishnu murali
Hi


Below is the script I used to create table in Mysql


CREATE TABLE `sample` (

  `id` varchar(45) NOT NULL,

  `a` decimal(10,3) DEFAULT NULL,

  `b` decimal(10,3) DEFAULT NULL,

  `c` decimal(10,3) DEFAULT NULL,

  `d` decimal(10,3) DEFAULT NULL,

  PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;



 Table data

Id- 1

a- 0.002

b- 2.250

c- 0.789

d- 0.558

On Thu, Jul 2, 2020, 19:50 Ricardo Ferreira  wrote:

> Vishnu,
>
> I think is hard to troubleshoot things without the proper context. In
> your case, could you please share an example of the rows contained in
> the table `sample`? As well as its DDL?
>
> -- Ricardo
>
> On 7/2/20 9:29 AM, vishnu murali wrote:
> > I go through that documentation
> >
> > Where it described like DECIMAL is not supported in MySQL  like this .
> >
> > And also no example for MySQL so is there any other sample with MySQL
> >
> >
> >
> > On Thu, Jul 2, 2020, 18:49 Robin Moffatt  wrote:
> >
> >> Check out this article where it covers decimal handling:
> >>
> >>
> https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#bytes-decimals-numerics
> >>
> >>
> >> --
> >>
> >> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
> >>
> >>
> >> On Thu, 2 Jul 2020 at 13:54, vishnu murali 
> >> wrote:
> >>
> >>> Hi Guys,
> >>>
> >>> I am having some problem while reading from MySQL using JDBC source and
> >>> received like below
> >>> Anyone know what is the reason and how to solve this ?
> >>>
> >>> "a": "Aote",
> >>>
> >>>"b": "AmrU",
> >>>
> >>>"c": "AceM",
> >>>
> >>>"d": "Aote",
> >>>
> >>>
> >>> Instead of
> >>>
> >>> "a": 0.002,
> >>>
> >>>"b": 0.465,
> >>>
> >>>"c": 0.545,
> >>>
> >>>"d": 0.100
> >>>
> >>>
> >>> It's my configuration
> >>>
> >>>
> >>> {
> >>>
> >>>  "name": "sample",
> >>>
> >>>  "config": {
> >>>
> >>>  "connector.class":
> >> "io.confluent.connect.jdbc.JdbcSourceConnector",
> >>>  "connection.url": "jdbc:mysql://localhost:3306/sample",
> >>>
> >>>  "connection.user": "",
> >>>
> >>>  "connection.password": "xxx",
> >>>
> >>>  "topic.prefix": "dample-",
> >>>
> >>>  "poll.interval.ms": 360,
> >>>
> >>>  "table.whitelist": "sample",
> >>>
> >>>  "schemas.enable": "false",
> >>>
> >>>  "mode": "bulk",
> >>>
> >>>  "value.converter.schemas.enable": "false",
> >>>
> >>>  "numeric.mapping": "best_fit",
> >>>
> >>>  "value.converter":
> "org.apache.kafka.connect.json.JsonConverter",
> >>>
> >>>  "transforms": "createKey,extractInt",
> >>>
> >>>  "transforms.createKey.type":
> >>> "org.apache.kafka.connect.transforms.ValueToKey",
> >>>
> >>>  "transforms.createKey.fields": "ID",
> >>>
> >>>  "transforms.extractInt.type":
> >>> "org.apache.kafka.connect.transforms.ExtractField$Key",
> >>>
> >>>  "transforms.extractInt.field": "ID"
> >>>
> >>>  }
> >>>
> >>> }
> >>>
>


Re: Problem in reading From JDBC SOURCE

2020-07-02 Thread vishnu murali
I go through that documentation

Where it described like DECIMAL is not supported in MySQL  like this .

And also no example for MySQL so is there any other sample with MySQL



On Thu, Jul 2, 2020, 18:49 Robin Moffatt  wrote:

> Check out this article where it covers decimal handling:
>
> https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#bytes-decimals-numerics
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Thu, 2 Jul 2020 at 13:54, vishnu murali 
> wrote:
>
> > Hi Guys,
> >
> > I am having some problem while reading from MySQL using JDBC source and
> > received like below
> > Anyone know what is the reason and how to solve this ?
> >
> > "a": "Aote",
> >
> >   "b": "AmrU",
> >
> >   "c": "AceM",
> >
> >   "d": "Aote",
> >
> >
> > Instead of
> >
> > "a": 0.002,
> >
> >   "b": 0.465,
> >
> >   "c": 0.545,
> >
> >   "d": 0.100
> >
> >
> > It's my configuration
> >
> >
> > {
> >
> > "name": "sample",
> >
> > "config": {
> >
> > "connector.class":
> "io.confluent.connect.jdbc.JdbcSourceConnector",
> >
> > "connection.url": "jdbc:mysql://localhost:3306/sample",
> >
> > "connection.user": "",
> >
> > "connection.password": "xxx",
> >
> > "topic.prefix": "dample-",
> >
> > "poll.interval.ms": 360,
> >
> > "table.whitelist": "sample",
> >
> > "schemas.enable": "false",
> >
> > "mode": "bulk",
> >
> > "value.converter.schemas.enable": "false",
> >
> > "numeric.mapping": "best_fit",
> >
> > "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> >
> > "transforms": "createKey,extractInt",
> >
> > "transforms.createKey.type":
> > "org.apache.kafka.connect.transforms.ValueToKey",
> >
> > "transforms.createKey.fields": "ID",
> >
> > "transforms.extractInt.type":
> > "org.apache.kafka.connect.transforms.ExtractField$Key",
> >
> > "transforms.extractInt.field": "ID"
> >
> > }
> >
> > }
> >
>


Problem in reading From JDBC SOURCE

2020-07-02 Thread vishnu murali
Hi Guys,

I am having some problem while reading from MySQL using JDBC source and
received like below
Anyone know what is the reason and how to solve this ?

"a": "Aote",

  "b": "AmrU",

  "c": "AceM",

  "d": "Aote",


Instead of

"a": 0.002,

  "b": 0.465,

  "c": 0.545,

  "d": 0.100


It's my configuration


{

"name": "sample",

"config": {

"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"connection.url": "jdbc:mysql://localhost:3306/sample",

"connection.user": "",

"connection.password": "xxx",

"topic.prefix": "dample-",

"poll.interval.ms": 360,

"table.whitelist": "sample",

"schemas.enable": "false",

"mode": "bulk",

"value.converter.schemas.enable": "false",

"numeric.mapping": "best_fit",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

"transforms": "createKey,extractInt",

"transforms.createKey.type":
"org.apache.kafka.connect.transforms.ValueToKey",

"transforms.createKey.fields": "ID",

"transforms.extractInt.type":
"org.apache.kafka.connect.transforms.ExtractField$Key",

"transforms.extractInt.field": "ID"

}

}


SFTP config Details as a JSON

2020-05-20 Thread vishnu murali
Hey Guys,

one of the value for SFTP configuration is key.schema.

i am giving through postman as a json request

So how can i give the schema details ,Because it is having DoubleQuotes For
all key and value?

could anyone explain?

if i give like this this exception is coming.


 "key.schema":
"{schema:{type:struct,fields:],optional:false,name:defaultkeyschemaname},payload:{}}",

*Exception:*

org.apache.kafka.common.config.ConfigException: Invalid value
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s'
(code 115)): was expecting double-quote to start field name
 at [Source:
(String)"{schema:{type:struct,fields:[],optional:false,name:defaultkeyschemaname},payload:{}}";
line: 1, column: 3] for configuration Could not read schema from
'key.schema'


SFTP config problem

2020-05-20 Thread vishnu murali
Hey guys

I changed the properties in SFTP CSV source and it is working fine..

Now I set the schema generation enabled true .so  it is adding the schema
data to every data into the topic ..

So when I set that generation false it ask for key.schema and value.schema

But both will be json and I didn't figured out how to give a Json as a
value for json key in the SFTP configuration

{
"Schema":{
"type":"struct",
"fields":[],
"optional":false,
"name":"abc"
},

}

How can I set this json  to a value of key.schema in json configuration.

Some typo or Punctuation error are coming


Re: Exception in SFTP CSV SOURCE

2020-05-19 Thread vishnu murali
I forwarded wrongly I suppose..

So is there any solution for that above exception...

On Tue, May 19, 2020, 22:35 Robin Moffatt  wrote:

> Hi Vishnu,
>
> I think there is a problem with your email client, it's just sent a
> duplicate of each of your emails from yesterday?
>
> thanks, Robin.
>
> On Tue, 19 May 2020 at 16:44, vishnu murali 
> wrote:
>
> > Hi Guys
> >
> > By Trying SFTP CSV SOURCE i am getting this exception by using this
> > configuration.
> >
> >
> > what is the issue and how to resolve it?
> >
> > can anyone know?
> >
> > *Config:*
> > {
> > "name": "CsvSFTP1",
> > "config": {
> > "tasks.max": "1",
> > "connector.class":
> > "io.confluent.connect.sftp.SftpCsvSourceConnector",
> > "cleanup.policy": "NONE",
> > "behavior.on.error": "IGNORE",
> > "input.path": "/mnt/c/Users/vmuralidharan/Desktop",
> > "error.path": "/mnt/c/Users/vmuralidharan/Desktop/error",
> > "finished.path": "/mnt/c/Users/vmuralidharan/Desktop/finished",
> > "input.file.pattern": "orm.csv",
> > "kafka.topic": "sftp-testing-topic",
> > "csv.first.row.as.header": "true",
> > "schema.generation.enabled": "false"
> > }
> > }
> >
> > *Exception StackTrace:*
> > org.apache.kafka.connect.errors.ConnectException: Can not get connection
> to
> > SFTP:
> > at
> >
> >
> io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:115)
> > at
> >
> >
> io.confluent.connect.sftp.connection.SftpConnection.(SftpConnection.java:66)
> > at
> >
> >
> io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.getSftpConnection(AbstractSftpSourceConnectorConfig.java:557)
> > at
> >
> >
> io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.(AbstractSftpSourceConnectorConfig.java:242)
> > at
> >
> >
> io.confluent.connect.sftp.source.SftpSourceConnectorConfig.(SftpSourceConnectorConfig.java:101)
> > at
> >
> >
> io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.(SftpCsvSourceConnectorConfig.java:157)
> > at
> >
> >
> io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:47)
> > at
> >
> >
> org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
> > at
> >
> >
> org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
> > at
> >
> >
> org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
> > at
> > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:260)
> > at
> >
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183)
> > at
> >
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125)
> > at
> >
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199)
> > at
> >
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195)
> > at
> > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> > at
> >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > at
> >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > at java.base/java.lang.Thread.run(Thread.java:834)
> > Caused by: com.jcraft.jsch.JSchException: java.net.ConnectException:
> > Connection refused (Connection refused)
> > at com.jcraft.jsch.Util.createSocket(Util.java:394)
> > at com.jcraft.jsch.Session.connect(Session.java:215)
> > at com.jcraft.jsch.Session.connect(Session.java:183)
> > at
> >
> >
> io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:109)
> > ... 18 more
> > Caused by: java.net.ConnectException: Connection refused (Connection
> > refused)
> > at java.base/java.net.PlainSocketImpl.socketConnect(Native
> Method)
> > at
> > java.base/java.net
> > .AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
> > at
> > java.base/java.net
> >
> .AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
> > at
> > java.base/java.net
> > .AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
> > at
> > java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
> > at java.base/java.net.Socket.connect(Socket.java:609)
> > at java.base/java.net.Socket.connect(Socket.java:558)
> > at java.base/java.net.Socket.(Socket.java:454)
> > at java.base/java.net.Socket.(Socket.java:231)
> > at com.jcraft.jsch.Util$1.run(Util.java:362)
> >
>


Fwd: Exception in SFTP CSV SOURCE

2020-05-19 Thread vishnu murali
Hi Guys

By Trying SFTP CSV SOURCE i am getting this exception by using this
configuration.


what is the issue and how to resolve it?

can anyone know?

*Config:*
{
"name": "CsvSFTP1",
"config": {
"tasks.max": "1",
"connector.class":
"io.confluent.connect.sftp.SftpCsvSourceConnector",
"cleanup.policy": "NONE",
"behavior.on.error": "IGNORE",
"input.path": "/mnt/c/Users/vmuralidharan/Desktop",
"error.path": "/mnt/c/Users/vmuralidharan/Desktop/error",
"finished.path": "/mnt/c/Users/vmuralidharan/Desktop/finished",
"input.file.pattern": "orm.csv",
"kafka.topic": "sftp-testing-topic",
"csv.first.row.as.header": "true",
"schema.generation.enabled": "false"
}
}

*Exception StackTrace:*
org.apache.kafka.connect.errors.ConnectException: Can not get connection to
SFTP:
at
io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:115)
at
io.confluent.connect.sftp.connection.SftpConnection.(SftpConnection.java:66)
at
io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.getSftpConnection(AbstractSftpSourceConnectorConfig.java:557)
at
io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.(AbstractSftpSourceConnectorConfig.java:242)
at
io.confluent.connect.sftp.source.SftpSourceConnectorConfig.(SftpSourceConnectorConfig.java:101)
at
io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.(SftpCsvSourceConnectorConfig.java:157)
at
io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:47)
at
org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at
org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at
org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at
org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:260)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195)
at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.jcraft.jsch.JSchException: java.net.ConnectException:
Connection refused (Connection refused)
at com.jcraft.jsch.Util.createSocket(Util.java:394)
at com.jcraft.jsch.Session.connect(Session.java:215)
at com.jcraft.jsch.Session.connect(Session.java:183)
at
io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:109)
... 18 more
Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at
java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at
java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at
java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/java.net.Socket.connect(Socket.java:558)
at java.base/java.net.Socket.(Socket.java:454)
at java.base/java.net.Socket.(Socket.java:231)
at com.jcraft.jsch.Util$1.run(Util.java:362)


SFTP Connector

2020-05-19 Thread vishnu murali
Hey Guys,

Now i am trying to implement SFTP connector by using this configurations.

i am using windows system.

so i am having doubts like how to set a path?

i tried to set like this in *config *as well as */mnt/c/users/vmuralidharan*

but it doesn't work.


so what i need to do ?

{
"name": "CsvSFTP11",
"config": {
"tasks.max": "1",
"connector.class":
"io.confluent.connect.sftp.SftpCsvSourceConnector",
"cleanup.policy": "NONE",
"behavior.on.error": "IGNORE",
"input.path": " C:\\Users\\vmuralidharan",
"error.path": "C:\\Users\\vmuralidharan\\Desktop\\error",
"finished.path": "C:\\Users\\vmuralidharan\\Desktop\\finished",
"input.file.pattern": "purchase.csv",
"kafka.topic": "sftp-testing-topic",
"csv.first.row.as.header": "true",
"schema.generation.enabled": "false"
}
}

How to get solution for this?


Exception in SFTP CSV SOURCE

2020-05-18 Thread vishnu murali
Hi Guys

By Trying SFTP CSV SOURCE i am getting this exception by using this
configuration.


what is the issue and how to resolve it?

can anyone know?

*Config:*
{
"name": "CsvSFTP1",
"config": {
"tasks.max": "1",
"connector.class":
"io.confluent.connect.sftp.SftpCsvSourceConnector",
"cleanup.policy": "NONE",
"behavior.on.error": "IGNORE",
"input.path": "/mnt/c/Users/vmuralidharan/Desktop",
"error.path": "/mnt/c/Users/vmuralidharan/Desktop/error",
"finished.path": "/mnt/c/Users/vmuralidharan/Desktop/finished",
"input.file.pattern": "orm.csv",
"kafka.topic": "sftp-testing-topic",
"csv.first.row.as.header": "true",
"schema.generation.enabled": "false"
}
}

*Exception StackTrace:*
org.apache.kafka.connect.errors.ConnectException: Can not get connection to
SFTP:
at
io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:115)
at
io.confluent.connect.sftp.connection.SftpConnection.(SftpConnection.java:66)
at
io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.getSftpConnection(AbstractSftpSourceConnectorConfig.java:557)
at
io.confluent.connect.sftp.source.AbstractSftpSourceConnectorConfig.(AbstractSftpSourceConnectorConfig.java:242)
at
io.confluent.connect.sftp.source.SftpSourceConnectorConfig.(SftpSourceConnectorConfig.java:101)
at
io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.(SftpCsvSourceConnectorConfig.java:157)
at
io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:47)
at
org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at
org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at
org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at
org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:260)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195)
at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.jcraft.jsch.JSchException: java.net.ConnectException:
Connection refused (Connection refused)
at com.jcraft.jsch.Util.createSocket(Util.java:394)
at com.jcraft.jsch.Session.connect(Session.java:215)
at com.jcraft.jsch.Session.connect(Session.java:183)
at
io.confluent.connect.sftp.connection.SftpConnection.init(SftpConnection.java:109)
... 18 more
Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at
java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at
java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at
java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/java.net.Socket.connect(Socket.java:558)
at java.base/java.net.Socket.(Socket.java:454)
at java.base/java.net.Socket.(Socket.java:231)
at com.jcraft.jsch.Util$1.run(Util.java:362)


SFTP Connector

2020-05-18 Thread vishnu murali
Hey Guys,

Now i am trying to implement SFTP connector by using this configurations.

i am using windows system.

so i am having doubts like how to set a path?

i tried to set like this in *config *as well as */mnt/c/users/vmuralidharan*

but it doesn't work.


so what i need to do ?

{
"name": "CsvSFTP11",
"config": {
"tasks.max": "1",
"connector.class":
"io.confluent.connect.sftp.SftpCsvSourceConnector",
"cleanup.policy": "NONE",
"behavior.on.error": "IGNORE",
"input.path": " C:\\Users\\vmuralidharan",
"error.path": "C:\\Users\\vmuralidharan\\Desktop\\error",
"finished.path": "C:\\Users\\vmuralidharan\\Desktop\\finished",
"input.file.pattern": "purchase.csv",
"kafka.topic": "sftp-testing-topic",
"csv.first.row.as.header": "true",
"schema.generation.enabled": "false"
}
}

How to get solution for this?


Problem in Docker

2020-05-15 Thread vishnu murali
Hi i am running cp-all-in one docker for the confluent kafka

There i am trying that JDBCSourceConnector.

it is showing this results..
{
"error_code": 400,
"message":
"Connector configuration is invalid and contains the following 2
error(s):\nInvalid value java.sql.SQLException: No suitable driver
found for jdbc:mysql://localhost:3306/sample for configuration
Couldn't open connection to
jdbc:mysql://localhost:3306/sample\nInvalid value
java.sql.SQLException: No suitable driver found for
jdbc:mysql://localhost:3306/sample for configuration Couldn't open
connection to jdbc:mysql://localhost:3306/sample\nYou can also find
the above list of errors at the endpoint
`/connector-plugins/{connectorType}/config/validate`"
}


How can i add external jar file in that docker?

Because i am new to docker and i am not even know it is possible or not!!!

can anyone know thie solution?


Re: JDBC source connector

2020-05-14 Thread vishnu murali
Thanks Liam

But I am asking like assume  I am having 10.

Using JDBC source I need to push that once..

No more additional data will be added in future in that table.

In that case i need to push that only once not more than one...

For this scenario I am asking!!

On Thu, May 14, 2020, 19:20 Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Why not use autoincrement? It'll only emit new records on subsequent polls
> then.
>
> On Thu, 14 May 2020, 11:15 pm vishnu murali, 
> wrote:
>
> > Hi Guys,
> >
> > I am using the mode *bulk  *and poll.interval.ms *10* in the Source
> > connector configuration.
> >
> > But I don't need to load data another time.?
> >
> > I need to load the data only once ??
> >
> > How can I able to do this ?
> >
>


JDBC source connector

2020-05-14 Thread vishnu murali
Hi Guys,

I am using the mode *bulk  *and poll.interval.ms *10* in the Source
connector configuration.

But I don't need to load data another time.?

I need to load the data only once ??

How can I able to do this ?


Jdbc Source Connector Config

2020-05-13 Thread vishnu murali
Hi Guys,

i am having a question.


"transforms":"createKey,extractInt",

"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
 "transforms.createKey.fields":"id",

"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
 "transforms.extractInt.field":"id"

what is the need of these configuration in JdbcSourceConnector?
And without these can we use SourceConnector?


Re: Custom Connector

2020-05-12 Thread vishnu murali
Thanks Tom

I am  receiving data from  one Rest Endpoint and post that data from
endpoint to topic.


Is it possible or any other connector available for that?

On Tue, May 12, 2020, 13:24 Tom Bentley  wrote:

> Hi Vishnu,
>
> These are probably a good place to start:
> 1. https://docs.confluent.io/current/connect/devguide.html
> 2.
>
> https://www.confluent.io/blog/create-dynamic-kafka-connect-source-connectors/
>
> Cheers,
>
> Tom
>
>
> On Tue, May 12, 2020 at 7:34 AM vishnu murali 
> wrote:
>
> > Hi Guys,
> >
> > i am trying to create a new connector for my own purpose.
> >
> > Is there any guide or document which show how to create a own connector
> and
> > use?
> >
>


Need a connector to listening Rest Endpoint

2020-05-12 Thread vishnu murali
Hi friends,

I am having Rest Endpoint and data is receiving in that endpoint
continuously..

I need to send that data to the Kafka topic ..


For these above scenarios I need to solve using connector..

Because I didn't want to run another application to receive data from rest
and send to Kafka.

Instead of seperate application to run and push the data into topic through
Rest service is there any connector available to listen that end point and
automatically push into a topic??


Custom Connector

2020-05-12 Thread vishnu murali
Hi Guys,

i am trying to create a new connector for my own purpose.

Is there any guide or document which show how to create a own connector and
use?


Re: JDBC SINK SCHEMA

2020-05-11 Thread vishnu murali
Hi Robin

Is it possible to integrate Apache Kafka with that confluent schema
registry like u said ??

I don't know how to do,can u able to give any reference?

On Mon, May 11, 2020, 14:09 Robin Moffatt  wrote:

> You can use Apache Kafka as you are currently using, and just deploy Schema
> Registry alongside it.
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Sat, 9 May 2020 at 02:16, Chris Toomey  wrote:
>
> > You have to either 1) use one of the Confluent serializers
> > <
> >
> https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#
> > >
> > when you publish to the topic, so that the schema (or reference to it) is
> > included, or 2) write and use a custom converter
> > <
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/storage/Converter.html
> > >
> > that knows about the data schema and can take the kafka record value and
> > convert it into a kafka connect record (by implementing the toConnectData
> > converter method), which is what the sink connectors are driven from.
> >
> > See https://docs.confluent.io/current/connect/concepts.html#converters
> >
> > Chris
> >
> >
> >
> > On Fri, May 8, 2020 at 6:59 AM vishnu murali  >
> > wrote:
> >
> > > Hey Guys,
> > >
> > > I am *using Apache **2.5 *not confluent.
> > >
> > > i am trying to send data from topic to database using jdbc sink
> > connector.
> > >
> > > we need to send that data with the appropriate schema also.
> > >
> > > i am *not using confluent version* of kafka.
> > >
> > > so can anyone explain how can i do this ?
> > >
> >
>


JDBC SINK SCHEMA

2020-05-08 Thread vishnu murali
Hey Guys,

I am *using Apache **2.5 *not confluent.

i am trying to send data from topic to database using jdbc sink connector.

we need to send that data with the appropriate schema also.

i am *not using confluent version* of kafka.

so can anyone explain how can i do this ?


Re: JDBC Sink Connector

2020-05-08 Thread vishnu murali
Thank you so much Robin

It helped me a lot to define sink connector with upsert mode and it is very
helpful.

For that schema related question i am not getting proper understanding.

Because i am using Normal Apache kafka,i don't know whether those schema
registry ,kql,avro serializers are present or not in Apache kafka (2.5)

I Suppose these Schema Registry and ksql services  are coming in the
confluent version of Kafka.

On Thu, May 7, 2020 at 1:47 PM Robin Moffatt  wrote:

> If you don't want to send the schema each time then serialise your data
> using Avro (or Protobuf), and then the schema is held in the Schema
> Registry. See https://www.youtube.com/watch?v=b-3qN_tlYR4=981s
>
> If you want to update a record insert of insert, you can use the upsert
> mode. See https://www.youtube.com/watch?v=b-3qN_tlYR4=627s
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Thu, 7 May 2020 at 06:48, vishnu murali 
> wrote:
>
> > Hey Guys,
> >
> > i am working on JDBC Sink Conneector to take data from kafka topic to
> > mysql.
> >
> > i am having 2 questions.
> >
> > i am using normal Apache Kafka 2.5 not a confluent version.
> >
> > 1)For inserting data every time we need to add the schema data also with
> > every data,How can i overcome this situation?i want to give only the
> data.
> >
> > 2)In certain time i need to update the existing record without adding as
> a
> > new record.How can i achieve this?
> >
>


Re: Kafka consumer

2020-05-07 Thread vishnu murali
Thanks Chris

But it won't work,I tried that also.

I found solution

That @KafkaListener default behavior it self is to  take one by one data
only..





On Thu, May 7, 2020, 11:28 Chris Toomey  wrote:

> You can set the max.poll.records config. setting to 1 in order to pull down
> and process 1 record at a time.
>
> See https://kafka.apache.org/documentation/#consumerconfigs .
>
> On Mon, May 4, 2020 at 1:04 AM vishnu murali 
> wrote:
>
> > Hey Guys,
> >
> > I am having a topic and in that topic I am having 3000 messages
> >
> > In my springboot application I want to consume the data using
> > @KafkaListener()  and also one by one because  I need to do some tedious
> > process on that Data it may take some time
> >
> > So within this time  I don't need to consume another data.
> >
> > After the process is finished only I need to consume the data from the
> > topic .?
> >
> > How can I do this?
> >
> > Any ideas?
> >
>


JDBC Sink Connector

2020-05-06 Thread vishnu murali
Hey Guys,

i am working on JDBC Sink Conneector to take data from kafka topic to mysql.

i am having 2 questions.

i am using normal Apache Kafka 2.5 not a confluent version.

1)For inserting data every time we need to add the schema data also with
every data,How can i overcome this situation?i want to give only the data.

2)In certain time i need to update the existing record without adding as a
new record.How can i achieve this?


Need to consume one by one data

2020-05-03 Thread vishnu murali
Hi guys

I am having Kafka topic of Films having 2000 data.

In spring boot KafkaListener I am listening that particular topic

But I need to process the every data one at a time  after that only I need
to consume the next record..

How can I overcome this scenario?

Any idea for this scenario?


Connect-Mirror Error

2020-05-02 Thread vishnu murali
Hey Guys,

Here By i am posting stack trace occured in the connect-distributed while
giving mirror connector configurations:

*Post*:http://localhost:8083/connectors

*Request json Body:*
{
"name": "us-west-sourc",
"config": {
"connector.class":
"org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "cluster 9092",
"target.cluster.alias": "cluster 9091",
"source.cluster.bootstrap.servers": "localhost:9092",
"topics": "vis-city"

}
}

it is saying as bootstrap.servers which has default value is missing?

even if i tried that config in request also it is responding the same...

what may be the problem?

Actually i am trying to copy data from topic in one cluster to another
cluster..

1)By using MirrorMaker command it is possible.

But i want to do the same using connector by giving request and then copy
the data?

What change i need to do?


[2020-05-02 20:40:43,304] ERROR WorkerConnector{id=us-west-sourc} Error
while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
org.apache.kafka.common.config.ConfigException: Missing required
configuration "bootstrap.servers" which has no default value.
at
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:477)
at
org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
at
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
at
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:142)
at
org.apache.kafka.clients.admin.AdminClientConfig.(AdminClientConfig.java:216)
at org.apache.kafka.clients.admin.Admin.create(Admin.java:71)
at
org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49)
at
org.apache.kafka.connect.mirror.MirrorSourceConnector.start(MirrorSourceConnector.java:115)
at
org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)
at
org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)
at
org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
at
org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:257)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1190)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:126)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1206)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1202)
at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:830)


Re: Connect-Mirror 2.5.0

2020-05-02 Thread vishnu murali
Hi Liam,

Thanks for the reply,

In that I want that connectors type of execution..

Here
PUT /connectors/us-west-source/config HTTP/1.1

{
"name": "us-west-source",
"connector.class":
"org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "us-west",
"target.cluster.alias": "us-east",
"source.cluster.bootstrap.servers": "us-west-host1:9091",
"topics": ".*"
}

Source I can able to mention in source.cluster.bootstrap.servers


So where I need to config the Target cluster bootstrap server address ?


For example source is localhost:9092
Target is localhost:8092

So where I need to mention Target


On Sat, May 2, 2020, 19:13 Liam Clarke-Hutchinson 
wrote:

> Hi Vishnu,
>
> As per my earlier email:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-Walkthrough:RunningMirrorMaker2.0
>
> In the same vein, any questions, hit me up,
>
> Liam Clarke-Hutchinson
>
> On Sat, May 2, 2020 at 9:56 PM vishnu murali 
> wrote:
>
> > Hey Guys
> >
> > I am using Apache version of 2.5
> >
> > Correct me if I am wrong!!
> >
> > Here there is a jar file called Connect-Mirror2.5.0 in the libs folder.I
> > think it is a connector  to  copy the topic data between one cluster to
> > another cluster like MirrorMaker..
> >
> > So I started zookeeper
> > I started Kafka server
> > I started connect-distributed
> >
> >
> > So what are the json configurations to give in the Post Request to make
> > that connector work..
> >
> > How can I mention source ,destination clusters whitelist topics in that
> > configuration file and process?
> >
>


Connect-Mirror 2.5.0

2020-05-02 Thread vishnu murali
Hey Guys

I am using Apache version of 2.5

Correct me if I am wrong!!

Here there is a jar file called Connect-Mirror2.5.0 in the libs folder.I
think it is a connector  to  copy the topic data between one cluster to
another cluster like MirrorMaker..

So I started zookeeper
I started Kafka server
I started connect-distributed


So what are the json configurations to give in the Post Request to make
that connector work..

How can I mention source ,destination clusters whitelist topics in that
configuration file and process?


Re: Connector For MirrorMaker

2020-05-01 Thread vishnu murali
Hi Robin

I am using Apache Kafka there is service called kafka-mirror-maker.bat with
the consumer and producer properties to copy topic from one cluster to
another.

I want to do that by using connector..

I didn't aware anything about MirrorMaker 2 and I didn't know how to
download and configure with Apache Kafka..

Can u guide me how to start with Mirror Maker 2 connector ?

On Fri, May 1, 2020, 19:13 Robin Moffatt  wrote:

> Are you talking about MirrorMaker 2? That runs as a connector.
>
> If not, perhaps you can clarify your question a bit as to what it is you
> are looking for.
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Fri, 1 May 2020 at 13:57, vishnu murali 
> wrote:
>
> > Hi Guys
> >
> > Previously I asked question about the Mirror maker and it is solved now.
> >
> > So Now I need to know is there any connectors available for that same.
> >
> > Like JdbcConnector acts as a source and sink for DB connection is there
> any
> > connector available for performing mirror operations
> >
> >  or
> >
> > does some one having own created connectors for this purpose??
> >
>


Connector For MirrorMaker

2020-05-01 Thread vishnu murali
Hi Guys

Previously I asked question about the Mirror maker and it is solved now.

So Now I need to know is there any connectors available for that same.

Like JdbcConnector acts as a source and sink for DB connection is there any
connector available for performing mirror operations

 or

does some one having own created connectors for this purpose??


Cant Able to start Kafka MirrorMaker

2020-05-01 Thread vishnu murali
Hey Guys,

I am trying to move data between one cluster to another cluster



*Source*

*Destination*

*Zookeeper*

2181

2182

*Kafka*

9092

9091



*ConsumerProperties:*

bootstrap.servers=localhost:9092
group.id=test-consumer-group
auto.offset.rest=earliest



*Producer Properties:*
bootstrap.servers=localhost:9091
compression.type=none


i am having topic in 9092 as actor which is from MySQL Sakila Schema actor
table


In the 9091 i don't have any topic ,so i try to migrate data from 9092
->9091 it is showing like


D:\kafka>.\bin\windows\kafka-mirror-maker.bat --consumer.config
.\config\consumer.properties --producer.config .\config\producer.properties
--whitelist actor


WARNING: The default partition assignment strategy of the mirror maker will
change from 'range' to 'roundrobin' in an upcoming release (so that better
load balancing can be achieved). If you prefer to make this switch in
advance of that release add the following to the corresponding config:
'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'


[2020-05-01 12:04:55,024] WARN The configuration 'auto.offset.rest' was
supplied but isn't a known config.
(org.apache.kafka.clients.consumer.ConsumerConfig)


*it's stucked in that place itself no more operations is performed*


and the topic is not copied into 9091


i dont know why


can anyone able to identify and explain me a way to do it please?


Re: Kafka Mirror Maker 2

2020-04-30 Thread vishnu murali
Hi Himanshu

Can u pls tell how to use MM2..

I am using Apache Kafka,in this normal mirror maker is only available..

Most of the people saying like to use MM2 but I didn't able to know where
to get that MM2.

Is it related to Apache or from some other Distributors?

Can u pls explain how to install that version

On Thu, Apr 30, 2020, 01:35 Himanshu Tyagi 
wrote:

> Hey Team,
> I've a few doubts regarding how the producers work after failover in Mirror
> Maker 2
>
> Let's say that we have two clusters K1 and K2 and configured MM2
> replication for TOPIC1 (originally created in just K1).
>
> We configured the active-active replication:
>
> K1->K2.enabled = true
> K2->K1.enabled = true
> K1->K2.topics = .*
> K2->K1.topics = .*
>
> On starting mirror maker 2, I see that topics are replicated from cluster
> K1
> to K2 in the naming format K1.topic_name_here and vice versa for topics
> from cluster K2 to K1.
>
> I see that there was no topic TOPIC1 created in K2, only K1.TOPIC1 was
> created. I see this scenario working for consumers, as in the beginning the
> consumers consume TOPIC1 from cluster K1. When the cluster K1 out of
> service, fail over happens. Consumers start to consume K1.TOPIC1 from K2.
>
> My questions are as follows:
>
>1. For producers, they won't producer to the topic K1.TOPIC1 in cluster
>K2, my question is how do the producers go about producing data. Do I
> need
>to manually create a topic TOPIC1 in cluster K2 and this topic will be
>used for producing messages to when failover happens.
>2. If the above scenario is true, how do I move back to my primary
>cluster K1. As, now the topic TOPIC1 in cluster K2 has digressed from
>the topic TOPIC in K1. How do we sync the messages in this scenario ?
>
>
> --
> Best,
> Himanshu Tyagi
> Contact No.- +1 480 465 0625
>


Re: Apache Kafka cluster to cluster

2020-04-29 Thread vishnu murali
i start the kafka mirror using the below configuration

.\bin\windows\kafka-mirror-maker.bat --consumer.config
.\config\consumer.properties --producer.config .\config\producer.properties
--whitelist=".*"

*Consumer.properties:*
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# consumer group id
group.id=test-consumer-group

*Producer.properties:*
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9091

# specify the compression codec for all data generated: none, gzip, snappy,
lz4, zstd
compression.type=none


I get this kind of warning statement and it will  be still present in that
statement of more than 30 minutes.No data will be transfered

WARNING: The default partition assignment strategy of the mirror maker will
change from 'range' to 'roundrobin' in an upcoming release (so that better
load balancing can be achieved). If you prefer to make this switch in
advance of that release add the following to the corresponding config:
'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'

can anyone clarify why i am getting this and what i am doing wrong?

On Thu, Apr 30, 2020 at 12:22 AM vishnu murali 
wrote:

> Thanks Blake..
>
> More over can we use any connector types for this situation?
>
> Like source is a topic and sink is also an another topic
>
> Is this possible...
>
> On Thu, Apr 30, 2020, 00:19 Blake Miller  wrote:
>
>> Hi Vishnu,
>>
>> Check out MirrorMaker
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
>>
>> This can do what you want. Note that the offsets are not copied, nor are
>> the message timestamps.
>>
>> HTH
>>
>>
>> On Wed, Apr 29, 2020 at 6:47 PM vishnu murali > >
>> wrote:
>>
>> > Hi Guys,
>> >
>> > I am having two separate Kafka cluster running in two independent
>> zookeeper
>> >
>> > I need to send a set of data from one topic from cluster A to cluster B
>> > with the same topic name  with all data also..
>> >
>> > How can I achieve this
>> >
>> > Done anyone have any idea ??
>> >
>>
>


Re: JDBC source connector to Kafka topic

2020-04-29 Thread vishnu murali
I am using normal Apache Kafka only not confluent

So step by step only I am starting Kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

Like this only I am starting and even before shutdown also I need to send
configuration details every time through post request to get the data from
DB.

On Thu, Apr 30, 2020, 02:04 Robin Moffatt  wrote:

> How are you running Kafka? Do you mean when you shut it down you have to
> reconfigure the connector?
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Wed, 29 Apr 2020 at 20:03, vishnu murali 
> wrote:
>
> > Hi guys
> > I am trying that JDBC source connector to get data from MySQL and send
> as a
> > data in a topic,so here what I am facing is there is more manual here
> >
> > After starting zookeeper,server, connect-distributed in Apache kafka
> >
> > I need to give Post request every time to the localhost:8083/connectors
> > with the request body of config details when I need data and also all
> data
> > will come again and again..
> >
> > Is there any way to achieve this CDC?
> >
>


JDBC source connector to Kafka topic

2020-04-29 Thread vishnu murali
Hi guys
I am trying that JDBC source connector to get data from MySQL and send as a
data in a topic,so here what I am facing is there is more manual here

After starting zookeeper,server, connect-distributed in Apache kafka

I need to give Post request every time to the localhost:8083/connectors
with the request body of config details when I need data and also all data
will come again and again..

Is there any way to achieve this CDC?


One cluster topic to another cluster topic

2020-04-29 Thread vishnu murali
Hi Guys,

I am having two separate Kafka cluster running in two independent zookeeper

I need to send a set of data from one topic from cluster A to cluster B
with the same topic name  with all data also..

How can I achieve this

Done anyone have any idea ??


Re: Apache Kafka cluster to cluster

2020-04-29 Thread vishnu murali
Thanks Blake..

More over can we use any connector types for this situation?

Like source is a topic and sink is also an another topic

Is this possible...

On Thu, Apr 30, 2020, 00:19 Blake Miller  wrote:

> Hi Vishnu,
>
> Check out MirrorMaker
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
>
> This can do what you want. Note that the offsets are not copied, nor are
> the message timestamps.
>
> HTH
>
>
> On Wed, Apr 29, 2020 at 6:47 PM vishnu murali 
> wrote:
>
> > Hi Guys,
> >
> > I am having two separate Kafka cluster running in two independent
> zookeeper
> >
> > I need to send a set of data from one topic from cluster A to cluster B
> > with the same topic name  with all data also..
> >
> > How can I achieve this
> >
> > Done anyone have any idea ??
> >
>


Apache Kafka cluster to cluster

2020-04-29 Thread vishnu murali
Hi Guys,

I am having two separate Kafka cluster running in two independent zookeeper

I need to send a set of data from one topic from cluster A to cluster B
with the same topic name  with all data also..

How can I achieve this

Done anyone have any idea ??