Hi, this depends on the Runner that you're using and how it subdivides the elements into bundles. The Kafka writer opens and closes a Producer for every bundle, so if every element arrives more or less in its own bundle then you will get a new Producer for every element.
Cheers, Aljoscha On Wed, 22 Jun 2016 at 18:25 Jean-Baptiste Onofré <[email protected]> wrote: > Hi Jesse, > > Gonna take a look, but the Writer could keep a KafkaProducer handler > IMHO (it's what I'm doing in the Jms IO). > > I keep you posted. > > Regards > JB > > On 06/22/2016 06:12 PM, Jesse Anderson wrote: > > The KafkaIO KafkaProducer seems to be closing after every send. On the > > next send, the KafkaProducer is opened again. > > > > Here is the log output; > > 2016-06-22 11:04:49,356 WARN KafkaIO:714 - Looks like generateSplits() > > is not called. Generate single split. > > 2016-06-22 11:04:49,399 INFO ConsumerConfig:165 - ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms <http://metadata.max.age.ms> = 300000 > > value.deserializer = class > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > group.id <http://group.id> = > > partition.assignment.strategy = > > [org.apache.kafka.clients.consumer.RangeAssignor] > > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [broker1:9092] > > retry.backoff.ms <http://retry.backoff.ms> = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = false > > ssl.key.password = null > > fetch.max.wait.ms <http://fetch.max.wait.ms> = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms <http://connections.max.idle.ms> = 540000 > > ssl.truststore.password = null > > session.timeout.ms <http://session.timeout.ms> = 30000 > > metrics.num.samples = 2 > > client.id <http://client.id> = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms <http://request.timeout.ms> = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000 > > auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000 > > receive.buffer.bytes = 524288 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000 > > fetch.min.bytes = 1 > > send.buffer.bytes = 131072 > > auto.offset.reset = earliest > > > > 2016-06-22 11:04:49,681 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > > 2016-06-22 11:04:49,681 INFO AppInfoParser:83 - Kafka commitId : > > 23c69d62a0cabf06 > > 2016-06-22 11:04:49,865 INFO KafkaIO:692 - Partitions assigned to split > > 0 (total 1): eventsim-0 > > 2016-06-22 11:04:49,921 INFO ConsumerConfig:165 - ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms <http://metadata.max.age.ms> = 300000 > > value.deserializer = class > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > group.id <http://group.id> = > > partition.assignment.strategy = > > [org.apache.kafka.clients.consumer.RangeAssignor] > > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [broker1:9092] > > retry.backoff.ms <http://retry.backoff.ms> = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = false > > ssl.key.password = null > > fetch.max.wait.ms <http://fetch.max.wait.ms> = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms <http://connections.max.idle.ms> = 540000 > > ssl.truststore.password = null > > session.timeout.ms <http://session.timeout.ms> = 30000 > > metrics.num.samples = 2 > > client.id <http://client.id> = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms <http://request.timeout.ms> = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000 > > auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000 > > receive.buffer.bytes = 524288 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000 > > fetch.min.bytes = 1 > > send.buffer.bytes = 131072 > > auto.offset.reset = earliest > > > > 2016-06-22 11:04:49,924 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > > 2016-06-22 11:04:49,924 INFO AppInfoParser:83 - Kafka commitId : > > 23c69d62a0cabf06 > > 2016-06-22 11:04:49,926 INFO KafkaIO:937 - Reader-0: resuming > > eventsim-0 at default offset > > 2016-06-22 11:04:49,934 INFO ConsumerConfig:165 - ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms <http://metadata.max.age.ms> = 300000 > > value.deserializer = class > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > group.id <http://group.id> = Reader-0_offset_consumer_698533492_none > > partition.assignment.strategy = > > [org.apache.kafka.clients.consumer.RangeAssignor] > > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [broker1:9092] > > retry.backoff.ms <http://retry.backoff.ms> = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = false > > ssl.key.password = null > > fetch.max.wait.ms <http://fetch.max.wait.ms> = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms <http://connections.max.idle.ms> = 540000 > > ssl.truststore.password = null > > session.timeout.ms <http://session.timeout.ms> = 30000 > > metrics.num.samples = 2 > > client.id <http://client.id> = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms <http://request.timeout.ms> = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000 > > auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000 > > receive.buffer.bytes = 524288 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000 > > fetch.min.bytes = 1 > > send.buffer.bytes = 131072 > > auto.offset.reset = earliest > > > > 2016-06-22 11:04:49,941 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > > 2016-06-22 11:04:49,942 INFO AppInfoParser:83 - Kafka commitId : > > 23c69d62a0cabf06 > > 2016-06-22 11:04:50,819 INFO AbstractCoordinator:529 - Marking the > > coordinator 2147483647 dead. > > 2016-06-22 11:04:50,884 INFO AbstractCoordinator:529 - Marking the > > coordinator 2147483647 dead. > > 2016-06-22 11:04:50,931 INFO AbstractCoordinator:529 - Marking the > > coordinator 2147483647 dead. > > 2016-06-22 11:04:50,992 INFO AbstractCoordinator:529 - Marking the > > coordinator 2147483647 dead. > > 2016-06-22 11:04:51,061 INFO AbstractCoordinator:529 - Marking the > > coordinator 2147483647 dead. > > 2016-06-22 11:04:51,212 INFO KafkaIO:1013 - Reader-0: first record > offset 0 > > HERE > > HERE > > HERE > > 2016-06-22 11:04:51,529 INFO ProducerConfig:165 - ProducerConfig values: > > compression.type = none > > metric.reporters = [] > > metadata.max.age.ms <http://metadata.max.age.ms> = 300000 > > metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms> = 60000 > > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > bootstrap.servers = [broker1:9092] > > retry.backoff.ms <http://retry.backoff.ms> = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > buffer.memory = 33554432 > > timeout.ms <http://timeout.ms> = 30000 > > key.serializer = class > > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > block.on.buffer.full = false > > ssl.key.password = null > > max.block.ms <http://max.block.ms> = 60000 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms <http://connections.max.idle.ms> = 540000 > > ssl.truststore.password = null > > max.in.flight.requests.per.connection = 5 > > metrics.num.samples = 2 > > client.id <http://client.id> = > > ssl.endpoint.identification.algorithm = null > > ssl.protocol = TLS > > request.timeout.ms <http://request.timeout.ms> = 30000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > acks = 1 > > batch.size = 16384 > > ssl.keystore.location = null > > receive.buffer.bytes = 32768 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > retries = 3 > > max.request.size = 1048576 > > value.serializer = class > > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000 > > partitioner.class = class > > org.apache.kafka.clients.producer.internals.DefaultPartitioner > > send.buffer.bytes = 131072 > > linger.ms <http://linger.ms> = 0 > > > > 2016-06-22 11:04:51,574 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > > 2016-06-22 11:04:51,574 INFO AppInfoParser:83 - Kafka commitId : > > 23c69d62a0cabf06 > > 2016-06-22 11:04:51,671 WARN NetworkClient:582 - Error while fetching > > metadata with correlation id 0 : {eventsimoutput=LEADER_NOT_AVAILABLE} > > 2016-06-22 11:04:51,797 INFO KafkaProducer:613 - Closing the Kafka > > producer with timeoutMillis = 9223372036854775807 ms. > > HERE > > 2016-06-22 11:05:13,247 INFO ProducerConfig:165 - ProducerConfig values: > > compression.type = none > > metric.reporters = [] > > metadata.max.age.ms <http://metadata.max.age.ms> = 300000 > > metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms> = 60000 > > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > bootstrap.servers = [broker1:9092] > > retry.backoff.ms <http://retry.backoff.ms> = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > buffer.memory = 33554432 > > timeout.ms <http://timeout.ms> = 30000 > > key.serializer = class > > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > block.on.buffer.full = false > > ssl.key.password = null > > max.block.ms <http://max.block.ms> = 60000 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms <http://connections.max.idle.ms> = 540000 > > ssl.truststore.password = null > > max.in.flight.requests.per.connection = 5 > > metrics.num.samples = 2 > > client.id <http://client.id> = > > ssl.endpoint.identification.algorithm = null > > ssl.protocol = TLS > > request.timeout.ms <http://request.timeout.ms> = 30000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > acks = 1 > > batch.size = 16384 > > ssl.keystore.location = null > > receive.buffer.bytes = 32768 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > retries = 3 > > max.request.size = 1048576 > > value.serializer = class > > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000 > > partitioner.class = class > > org.apache.kafka.clients.producer.internals.DefaultPartitioner > > send.buffer.bytes = 131072 > > linger.ms <http://linger.ms> = 0 > > > > 2016-06-22 11:05:13,252 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > > 2016-06-22 11:05:13,252 INFO AppInfoParser:83 - Kafka commitId : > > 23c69d62a0cabf06 > > 2016-06-22 11:05:13,361 INFO KafkaProducer:613 - Closing the Kafka > > producer with timeoutMillis = 9223372036854775807 ms. > > HERE > > 2016-06-22 11:05:15,003 INFO ProducerConfig:165 - ProducerConfig values: > > compression.type = none > > metric.reporters = [] > > metadata.max.age.ms <http://metadata.max.age.ms> = 300000 > > metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms> = 60000 > > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > bootstrap.servers = [broker1:9092] > > retry.backoff.ms <http://retry.backoff.ms> = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > buffer.memory = 33554432 > > timeout.ms <http://timeout.ms> = 30000 > > key.serializer = class > > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > block.on.buffer.full = false > > ssl.key.password = null > > max.block.ms <http://max.block.ms> = 60000 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms <http://connections.max.idle.ms> = 540000 > > ssl.truststore.password = null > > max.in.flight.requests.per.connection = 5 > > metrics.num.samples = 2 > > client.id <http://client.id> = > > ssl.endpoint.identification.algorithm = null > > ssl.protocol = TLS > > request.timeout.ms <http://request.timeout.ms> = 30000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > acks = 1 > > batch.size = 16384 > > ssl.keystore.location = null > > receive.buffer.bytes = 32768 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > retries = 3 > > max.request.size = 1048576 > > value.serializer = class > > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000 > > partitioner.class = class > > org.apache.kafka.clients.producer.internals.DefaultPartitioner > > send.buffer.bytes = 131072 > > linger.ms <http://linger.ms> = 0 > > > > 2016-06-22 11:05:15,008 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > > 2016-06-22 11:05:15,008 INFO AppInfoParser:83 - Kafka commitId : > > 23c69d62a0cabf06 > > 2016-06-22 11:05:15,120 INFO KafkaProducer:613 - Closing the Kafka > > producer with timeoutMillis = 9223372036854775807 ms. > > HERE > > 2016-06-22 11:06:20,735 INFO ProducerConfig:165 - ProducerConfig values: > > compression.type = none > > metric.reporters = [] > > metadata.max.age.ms <http://metadata.max.age.ms> = 300000 > > metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms> = 60000 > > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > bootstrap.servers = [broker1:9092] > > retry.backoff.ms <http://retry.backoff.ms> = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > buffer.memory = 33554432 > > timeout.ms <http://timeout.ms> = 30000 > > key.serializer = class > > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > block.on.buffer.full = false > > ssl.key.password = null > > max.block.ms <http://max.block.ms> = 60000 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms <http://connections.max.idle.ms> = 540000 > > ssl.truststore.password = null > > max.in.flight.requests.per.connection = 5 > > metrics.num.samples = 2 > > client.id <http://client.id> = > > ssl.endpoint.identification.algorithm = null > > ssl.protocol = TLS > > request.timeout.ms <http://request.timeout.ms> = 30000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > acks = 1 > > batch.size = 16384 > > ssl.keystore.location = null > > receive.buffer.bytes = 32768 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > retries = 3 > > max.request.size = 1048576 > > value.serializer = class > > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000 > > partitioner.class = class > > org.apache.kafka.clients.producer.internals.DefaultPartitioner > > send.buffer.bytes = 131072 > > linger.ms <http://linger.ms> = 0 > > > > 2016-06-22 11:06:20,743 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > > 2016-06-22 11:06:20,744 INFO AppInfoParser:83 - Kafka commitId : > > 23c69d62a0cabf06 > > 2016-06-22 11:06:20,849 INFO KafkaProducer:613 - Closing the Kafka > > producer with timeoutMillis = 9223372036854775807 ms. > > > > Thanks, > > > > Jesse > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
