Yeah, this is pretty bad right now. There is a proposal to add API to DoFn that makes it easy to state across multiple bundles.
I think the current sink is pretty unusable in this state. We could use an cache of producers (that expire if unused for a few seconds). On Wed, Jun 22, 2016 at 9:12 AM, Jesse Anderson <[email protected]> wrote: > The KafkaIO KafkaProducer seems to be closing after every send. On the > next send, the KafkaProducer is opened again. > > Here is the log output; > 2016-06-22 11:04:49,356 WARN KafkaIO:714 - Looks like generateSplits() is > not called. Generate single split. > 2016-06-22 11:04:49,399 INFO ConsumerConfig:165 - ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 300000 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [broker1:9092] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > enable.auto.commit = false > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > session.timeout.ms = 30000 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > ssl.protocol = TLS > check.crcs = true > request.timeout.ms = 40000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 524288 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > auto.offset.reset = earliest > > 2016-06-22 11:04:49,681 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > 2016-06-22 11:04:49,681 INFO AppInfoParser:83 - Kafka commitId : > 23c69d62a0cabf06 > 2016-06-22 11:04:49,865 INFO KafkaIO:692 - Partitions assigned to split 0 > (total 1): eventsim-0 > 2016-06-22 11:04:49,921 INFO ConsumerConfig:165 - ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 300000 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [broker1:9092] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > enable.auto.commit = false > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > session.timeout.ms = 30000 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > ssl.protocol = TLS > check.crcs = true > request.timeout.ms = 40000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 524288 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > auto.offset.reset = earliest > > 2016-06-22 11:04:49,924 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > 2016-06-22 11:04:49,924 INFO AppInfoParser:83 - Kafka commitId : > 23c69d62a0cabf06 > 2016-06-22 11:04:49,926 INFO KafkaIO:937 - Reader-0: resuming eventsim-0 > at default offset > 2016-06-22 11:04:49,934 INFO ConsumerConfig:165 - ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 300000 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = Reader-0_offset_consumer_698533492_none > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [broker1:9092] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > enable.auto.commit = false > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > session.timeout.ms = 30000 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > ssl.protocol = TLS > check.crcs = true > request.timeout.ms = 40000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 524288 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > auto.offset.reset = earliest > > 2016-06-22 11:04:49,941 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > 2016-06-22 11:04:49,942 INFO AppInfoParser:83 - Kafka commitId : > 23c69d62a0cabf06 > 2016-06-22 11:04:50,819 INFO AbstractCoordinator:529 - Marking the > coordinator 2147483647 dead. > 2016-06-22 11:04:50,884 INFO AbstractCoordinator:529 - Marking the > coordinator 2147483647 dead. > 2016-06-22 11:04:50,931 INFO AbstractCoordinator:529 - Marking the > coordinator 2147483647 dead. > 2016-06-22 11:04:50,992 INFO AbstractCoordinator:529 - Marking the > coordinator 2147483647 dead. > 2016-06-22 11:04:51,061 INFO AbstractCoordinator:529 - Marking the > coordinator 2147483647 dead. > 2016-06-22 11:04:51,212 INFO KafkaIO:1013 - Reader-0: first record offset > 0 > HERE > HERE > HERE > 2016-06-22 11:04:51,529 INFO ProducerConfig:165 - ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > bootstrap.servers = [broker1:9092] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > block.on.buffer.full = false > ssl.key.password = null > max.block.ms = 60000 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > ssl.protocol = TLS > request.timeout.ms = 30000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > acks = 1 > batch.size = 16384 > ssl.keystore.location = null > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > retries = 3 > max.request.size = 1048576 > value.serializer = class > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > send.buffer.bytes = 131072 > linger.ms = 0 > > 2016-06-22 11:04:51,574 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > 2016-06-22 11:04:51,574 INFO AppInfoParser:83 - Kafka commitId : > 23c69d62a0cabf06 > 2016-06-22 11:04:51,671 WARN NetworkClient:582 - Error while fetching > metadata with correlation id 0 : {eventsimoutput=LEADER_NOT_AVAILABLE} > 2016-06-22 11:04:51,797 INFO KafkaProducer:613 - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > HERE > 2016-06-22 11:05:13,247 INFO ProducerConfig:165 - ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > bootstrap.servers = [broker1:9092] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > block.on.buffer.full = false > ssl.key.password = null > max.block.ms = 60000 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > ssl.protocol = TLS > request.timeout.ms = 30000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > acks = 1 > batch.size = 16384 > ssl.keystore.location = null > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > retries = 3 > max.request.size = 1048576 > value.serializer = class > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > send.buffer.bytes = 131072 > linger.ms = 0 > > 2016-06-22 11:05:13,252 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > 2016-06-22 11:05:13,252 INFO AppInfoParser:83 - Kafka commitId : > 23c69d62a0cabf06 > 2016-06-22 11:05:13,361 INFO KafkaProducer:613 - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > HERE > 2016-06-22 11:05:15,003 INFO ProducerConfig:165 - ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > bootstrap.servers = [broker1:9092] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > block.on.buffer.full = false > ssl.key.password = null > max.block.ms = 60000 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > ssl.protocol = TLS > request.timeout.ms = 30000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > acks = 1 > batch.size = 16384 > ssl.keystore.location = null > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > retries = 3 > max.request.size = 1048576 > value.serializer = class > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > send.buffer.bytes = 131072 > linger.ms = 0 > > 2016-06-22 11:05:15,008 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > 2016-06-22 11:05:15,008 INFO AppInfoParser:83 - Kafka commitId : > 23c69d62a0cabf06 > 2016-06-22 11:05:15,120 INFO KafkaProducer:613 - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > HERE > 2016-06-22 11:06:20,735 INFO ProducerConfig:165 - ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > bootstrap.servers = [broker1:9092] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > block.on.buffer.full = false > ssl.key.password = null > max.block.ms = 60000 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > ssl.protocol = TLS > request.timeout.ms = 30000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > acks = 1 > batch.size = 16384 > ssl.keystore.location = null > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > retries = 3 > max.request.size = 1048576 > value.serializer = class > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > send.buffer.bytes = 131072 > linger.ms = 0 > > 2016-06-22 11:06:20,743 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 > 2016-06-22 11:06:20,744 INFO AppInfoParser:83 - Kafka commitId : > 23c69d62a0cabf06 > 2016-06-22 11:06:20,849 INFO KafkaProducer:613 - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > > Thanks, > > Jesse >
