The Flink runner sits at the other end of the spectrum, in that everything is one big bundle. So the Kafka Sind should work well with that.
On Wed, 22 Jun 2016 at 20:30 Raghu Angadi <[email protected]> wrote: > Yeah, this is pretty bad right now. There is a proposal to add API to DoFn > that makes it easy to state across multiple bundles. > > I think the current sink is pretty unusable in this state. We could use an > cache of producers (that expire if unused for a few seconds). > > On Wed, Jun 22, 2016 at 9:12 AM, Jesse Anderson <[email protected]> > wrote: > >> The KafkaIO KafkaProducer seems to be closing after every send. On the >> next send, the KafkaProducer is opened again. >> >> Here is the log output; >> 2016-06-22 11:04:49,356 WARN KafkaIO:714 - Looks like generateSplits() >> is not called. Generate single split. >> 2016-06-22 11:04:49,399 INFO ConsumerConfig:165 - ConsumerConfig values: >> metric.reporters = [] >> metadata.max.age.ms = 300000 >> value.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> group.id = >> partition.assignment.strategy = >> [org.apache.kafka.clients.consumer.RangeAssignor] >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> max.partition.fetch.bytes = 1048576 >> bootstrap.servers = [broker1:9092] >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> ssl.keystore.type = JKS >> ssl.trustmanager.algorithm = PKIX >> enable.auto.commit = false >> ssl.key.password = null >> fetch.max.wait.ms = 500 >> sasl.kerberos.min.time.before.relogin = 60000 >> connections.max.idle.ms = 540000 >> ssl.truststore.password = null >> session.timeout.ms = 30000 >> metrics.num.samples = 2 >> client.id = >> ssl.endpoint.identification.algorithm = null >> key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> ssl.protocol = TLS >> check.crcs = true >> request.timeout.ms = 40000 >> ssl.provider = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.keystore.location = null >> heartbeat.interval.ms = 3000 >> auto.commit.interval.ms = 5000 >> receive.buffer.bytes = 524288 >> ssl.cipher.suites = null >> ssl.truststore.type = JKS >> security.protocol = PLAINTEXT >> ssl.truststore.location = null >> ssl.keystore.password = null >> ssl.keymanager.algorithm = SunX509 >> metrics.sample.window.ms = 30000 >> fetch.min.bytes = 1 >> send.buffer.bytes = 131072 >> auto.offset.reset = earliest >> >> 2016-06-22 11:04:49,681 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >> 2016-06-22 11:04:49,681 INFO AppInfoParser:83 - Kafka commitId : >> 23c69d62a0cabf06 >> 2016-06-22 11:04:49,865 INFO KafkaIO:692 - Partitions assigned to split >> 0 (total 1): eventsim-0 >> 2016-06-22 11:04:49,921 INFO ConsumerConfig:165 - ConsumerConfig values: >> metric.reporters = [] >> metadata.max.age.ms = 300000 >> value.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> group.id = >> partition.assignment.strategy = >> [org.apache.kafka.clients.consumer.RangeAssignor] >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> max.partition.fetch.bytes = 1048576 >> bootstrap.servers = [broker1:9092] >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> ssl.keystore.type = JKS >> ssl.trustmanager.algorithm = PKIX >> enable.auto.commit = false >> ssl.key.password = null >> fetch.max.wait.ms = 500 >> sasl.kerberos.min.time.before.relogin = 60000 >> connections.max.idle.ms = 540000 >> ssl.truststore.password = null >> session.timeout.ms = 30000 >> metrics.num.samples = 2 >> client.id = >> ssl.endpoint.identification.algorithm = null >> key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> ssl.protocol = TLS >> check.crcs = true >> request.timeout.ms = 40000 >> ssl.provider = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.keystore.location = null >> heartbeat.interval.ms = 3000 >> auto.commit.interval.ms = 5000 >> receive.buffer.bytes = 524288 >> ssl.cipher.suites = null >> ssl.truststore.type = JKS >> security.protocol = PLAINTEXT >> ssl.truststore.location = null >> ssl.keystore.password = null >> ssl.keymanager.algorithm = SunX509 >> metrics.sample.window.ms = 30000 >> fetch.min.bytes = 1 >> send.buffer.bytes = 131072 >> auto.offset.reset = earliest >> >> 2016-06-22 11:04:49,924 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >> 2016-06-22 11:04:49,924 INFO AppInfoParser:83 - Kafka commitId : >> 23c69d62a0cabf06 >> 2016-06-22 11:04:49,926 INFO KafkaIO:937 - Reader-0: resuming eventsim-0 >> at default offset >> 2016-06-22 11:04:49,934 INFO ConsumerConfig:165 - ConsumerConfig values: >> metric.reporters = [] >> metadata.max.age.ms = 300000 >> value.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> group.id = Reader-0_offset_consumer_698533492_none >> partition.assignment.strategy = >> [org.apache.kafka.clients.consumer.RangeAssignor] >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> max.partition.fetch.bytes = 1048576 >> bootstrap.servers = [broker1:9092] >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> ssl.keystore.type = JKS >> ssl.trustmanager.algorithm = PKIX >> enable.auto.commit = false >> ssl.key.password = null >> fetch.max.wait.ms = 500 >> sasl.kerberos.min.time.before.relogin = 60000 >> connections.max.idle.ms = 540000 >> ssl.truststore.password = null >> session.timeout.ms = 30000 >> metrics.num.samples = 2 >> client.id = >> ssl.endpoint.identification.algorithm = null >> key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> ssl.protocol = TLS >> check.crcs = true >> request.timeout.ms = 40000 >> ssl.provider = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.keystore.location = null >> heartbeat.interval.ms = 3000 >> auto.commit.interval.ms = 5000 >> receive.buffer.bytes = 524288 >> ssl.cipher.suites = null >> ssl.truststore.type = JKS >> security.protocol = PLAINTEXT >> ssl.truststore.location = null >> ssl.keystore.password = null >> ssl.keymanager.algorithm = SunX509 >> metrics.sample.window.ms = 30000 >> fetch.min.bytes = 1 >> send.buffer.bytes = 131072 >> auto.offset.reset = earliest >> >> 2016-06-22 11:04:49,941 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >> 2016-06-22 11:04:49,942 INFO AppInfoParser:83 - Kafka commitId : >> 23c69d62a0cabf06 >> 2016-06-22 11:04:50,819 INFO AbstractCoordinator:529 - Marking the >> coordinator 2147483647 dead. >> 2016-06-22 11:04:50,884 INFO AbstractCoordinator:529 - Marking the >> coordinator 2147483647 dead. >> 2016-06-22 11:04:50,931 INFO AbstractCoordinator:529 - Marking the >> coordinator 2147483647 dead. >> 2016-06-22 11:04:50,992 INFO AbstractCoordinator:529 - Marking the >> coordinator 2147483647 dead. >> 2016-06-22 11:04:51,061 INFO AbstractCoordinator:529 - Marking the >> coordinator 2147483647 dead. >> 2016-06-22 11:04:51,212 INFO KafkaIO:1013 - Reader-0: first record >> offset 0 >> HERE >> HERE >> HERE >> 2016-06-22 11:04:51,529 INFO ProducerConfig:165 - ProducerConfig values: >> compression.type = none >> metric.reporters = [] >> metadata.max.age.ms = 300000 >> metadata.fetch.timeout.ms = 60000 >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> bootstrap.servers = [broker1:9092] >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> buffer.memory = 33554432 >> timeout.ms = 30000 >> key.serializer = class >> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> ssl.keystore.type = JKS >> ssl.trustmanager.algorithm = PKIX >> block.on.buffer.full = false >> ssl.key.password = null >> max.block.ms = 60000 >> sasl.kerberos.min.time.before.relogin = 60000 >> connections.max.idle.ms = 540000 >> ssl.truststore.password = null >> max.in.flight.requests.per.connection = 5 >> metrics.num.samples = 2 >> client.id = >> ssl.endpoint.identification.algorithm = null >> ssl.protocol = TLS >> request.timeout.ms = 30000 >> ssl.provider = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> acks = 1 >> batch.size = 16384 >> ssl.keystore.location = null >> receive.buffer.bytes = 32768 >> ssl.cipher.suites = null >> ssl.truststore.type = JKS >> security.protocol = PLAINTEXT >> retries = 3 >> max.request.size = 1048576 >> value.serializer = class >> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >> ssl.truststore.location = null >> ssl.keystore.password = null >> ssl.keymanager.algorithm = SunX509 >> metrics.sample.window.ms = 30000 >> partitioner.class = class >> org.apache.kafka.clients.producer.internals.DefaultPartitioner >> send.buffer.bytes = 131072 >> linger.ms = 0 >> >> 2016-06-22 11:04:51,574 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >> 2016-06-22 11:04:51,574 INFO AppInfoParser:83 - Kafka commitId : >> 23c69d62a0cabf06 >> 2016-06-22 11:04:51,671 WARN NetworkClient:582 - Error while fetching >> metadata with correlation id 0 : {eventsimoutput=LEADER_NOT_AVAILABLE} >> 2016-06-22 11:04:51,797 INFO KafkaProducer:613 - Closing the Kafka >> producer with timeoutMillis = 9223372036854775807 ms. >> HERE >> 2016-06-22 11:05:13,247 INFO ProducerConfig:165 - ProducerConfig values: >> compression.type = none >> metric.reporters = [] >> metadata.max.age.ms = 300000 >> metadata.fetch.timeout.ms = 60000 >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> bootstrap.servers = [broker1:9092] >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> buffer.memory = 33554432 >> timeout.ms = 30000 >> key.serializer = class >> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> ssl.keystore.type = JKS >> ssl.trustmanager.algorithm = PKIX >> block.on.buffer.full = false >> ssl.key.password = null >> max.block.ms = 60000 >> sasl.kerberos.min.time.before.relogin = 60000 >> connections.max.idle.ms = 540000 >> ssl.truststore.password = null >> max.in.flight.requests.per.connection = 5 >> metrics.num.samples = 2 >> client.id = >> ssl.endpoint.identification.algorithm = null >> ssl.protocol = TLS >> request.timeout.ms = 30000 >> ssl.provider = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> acks = 1 >> batch.size = 16384 >> ssl.keystore.location = null >> receive.buffer.bytes = 32768 >> ssl.cipher.suites = null >> ssl.truststore.type = JKS >> security.protocol = PLAINTEXT >> retries = 3 >> max.request.size = 1048576 >> value.serializer = class >> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >> ssl.truststore.location = null >> ssl.keystore.password = null >> ssl.keymanager.algorithm = SunX509 >> metrics.sample.window.ms = 30000 >> partitioner.class = class >> org.apache.kafka.clients.producer.internals.DefaultPartitioner >> send.buffer.bytes = 131072 >> linger.ms = 0 >> >> 2016-06-22 11:05:13,252 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >> 2016-06-22 11:05:13,252 INFO AppInfoParser:83 - Kafka commitId : >> 23c69d62a0cabf06 >> 2016-06-22 11:05:13,361 INFO KafkaProducer:613 - Closing the Kafka >> producer with timeoutMillis = 9223372036854775807 ms. >> HERE >> 2016-06-22 11:05:15,003 INFO ProducerConfig:165 - ProducerConfig values: >> compression.type = none >> metric.reporters = [] >> metadata.max.age.ms = 300000 >> metadata.fetch.timeout.ms = 60000 >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> bootstrap.servers = [broker1:9092] >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> buffer.memory = 33554432 >> timeout.ms = 30000 >> key.serializer = class >> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> ssl.keystore.type = JKS >> ssl.trustmanager.algorithm = PKIX >> block.on.buffer.full = false >> ssl.key.password = null >> max.block.ms = 60000 >> sasl.kerberos.min.time.before.relogin = 60000 >> connections.max.idle.ms = 540000 >> ssl.truststore.password = null >> max.in.flight.requests.per.connection = 5 >> metrics.num.samples = 2 >> client.id = >> ssl.endpoint.identification.algorithm = null >> ssl.protocol = TLS >> request.timeout.ms = 30000 >> ssl.provider = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> acks = 1 >> batch.size = 16384 >> ssl.keystore.location = null >> receive.buffer.bytes = 32768 >> ssl.cipher.suites = null >> ssl.truststore.type = JKS >> security.protocol = PLAINTEXT >> retries = 3 >> max.request.size = 1048576 >> value.serializer = class >> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >> ssl.truststore.location = null >> ssl.keystore.password = null >> ssl.keymanager.algorithm = SunX509 >> metrics.sample.window.ms = 30000 >> partitioner.class = class >> org.apache.kafka.clients.producer.internals.DefaultPartitioner >> send.buffer.bytes = 131072 >> linger.ms = 0 >> >> 2016-06-22 11:05:15,008 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >> 2016-06-22 11:05:15,008 INFO AppInfoParser:83 - Kafka commitId : >> 23c69d62a0cabf06 >> 2016-06-22 11:05:15,120 INFO KafkaProducer:613 - Closing the Kafka >> producer with timeoutMillis = 9223372036854775807 ms. >> HERE >> 2016-06-22 11:06:20,735 INFO ProducerConfig:165 - ProducerConfig values: >> compression.type = none >> metric.reporters = [] >> metadata.max.age.ms = 300000 >> metadata.fetch.timeout.ms = 60000 >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> bootstrap.servers = [broker1:9092] >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> buffer.memory = 33554432 >> timeout.ms = 30000 >> key.serializer = class >> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> ssl.keystore.type = JKS >> ssl.trustmanager.algorithm = PKIX >> block.on.buffer.full = false >> ssl.key.password = null >> max.block.ms = 60000 >> sasl.kerberos.min.time.before.relogin = 60000 >> connections.max.idle.ms = 540000 >> ssl.truststore.password = null >> max.in.flight.requests.per.connection = 5 >> metrics.num.samples = 2 >> client.id = >> ssl.endpoint.identification.algorithm = null >> ssl.protocol = TLS >> request.timeout.ms = 30000 >> ssl.provider = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> acks = 1 >> batch.size = 16384 >> ssl.keystore.location = null >> receive.buffer.bytes = 32768 >> ssl.cipher.suites = null >> ssl.truststore.type = JKS >> security.protocol = PLAINTEXT >> retries = 3 >> max.request.size = 1048576 >> value.serializer = class >> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >> ssl.truststore.location = null >> ssl.keystore.password = null >> ssl.keymanager.algorithm = SunX509 >> metrics.sample.window.ms = 30000 >> partitioner.class = class >> org.apache.kafka.clients.producer.internals.DefaultPartitioner >> send.buffer.bytes = 131072 >> linger.ms = 0 >> >> 2016-06-22 11:06:20,743 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >> 2016-06-22 11:06:20,744 INFO AppInfoParser:83 - Kafka commitId : >> 23c69d62a0cabf06 >> 2016-06-22 11:06:20,849 INFO KafkaProducer:613 - Closing the Kafka >> producer with timeoutMillis = 9223372036854775807 ms. >> >> Thanks, >> >> Jesse >> > >
