Hi Austin,
FileStreamSourceTask [1] does not parse file lines and insert them into Kafka 
with null keys (and does not have the properties you mentioned).

To use connectors in your case, I see two ways, and both require code 
modifications:1. Create your own FileStreamSourceTask (or propose a 
modification to Kafka community)2. Create a Jira issue for Ignite (and possibly 
contribute) to have a functionality to extract keys from values (file lines in 
your case).
[1] 
https://github.com/confluentinc/kafka/blob/trunk/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
[2] https://issues.apache.org/jira/browse/IGNITE
- Roman

 

    On Tuesday, November 8, 2016 3:47 PM, austin solomon 
<[email protected]> wrote:
 

 Hi Roman,

Is it possible to give the configuration properties parse.key=true &
key.separator=, in producer, because when I start the
FileStreamSourceConnector I could see the following values 

INFO ProducerConfig values: 
    acks = all
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.id = producer-4
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 9223372036854775807
    max.in.flight.requests.per.connection = 1
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 2147483647
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

Thanks,
Austin




--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/Kafka-Failed-to-stream-a-record-with-null-key-tp8731p8766.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.


   

Reply via email to