The Flink runner sits at the other end of the spectrum, in that everything
is one big bundle. So the Kafka Sind should work well with that.

On Wed, 22 Jun 2016 at 20:30 Raghu Angadi <[email protected]> wrote:

> Yeah, this is pretty bad right now. There is a proposal to add API to DoFn
> that makes it easy to state across multiple bundles.
>
> I think the current sink is pretty unusable in this state. We could use an
> cache of producers (that expire if unused for a few seconds).
>
> On Wed, Jun 22, 2016 at 9:12 AM, Jesse Anderson <[email protected]>
> wrote:
>
>> The KafkaIO KafkaProducer seems to be closing after every send. On the
>> next send, the KafkaProducer is opened again.
>>
>> Here is the log output;
>> 2016-06-22 11:04:49,356 WARN  KafkaIO:714 - Looks like generateSplits()
>> is not called. Generate single split.
>> 2016-06-22 11:04:49,399 INFO  ConsumerConfig:165 - ConsumerConfig values:
>> metric.reporters = []
>> metadata.max.age.ms = 300000
>> value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> group.id =
>> partition.assignment.strategy =
>> [org.apache.kafka.clients.consumer.RangeAssignor]
>> reconnect.backoff.ms = 50
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> max.partition.fetch.bytes = 1048576
>> bootstrap.servers = [broker1:9092]
>> retry.backoff.ms = 100
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> ssl.keystore.type = JKS
>> ssl.trustmanager.algorithm = PKIX
>> enable.auto.commit = false
>> ssl.key.password = null
>> fetch.max.wait.ms = 500
>> sasl.kerberos.min.time.before.relogin = 60000
>> connections.max.idle.ms = 540000
>> ssl.truststore.password = null
>> session.timeout.ms = 30000
>> metrics.num.samples = 2
>> client.id =
>> ssl.endpoint.identification.algorithm = null
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> ssl.protocol = TLS
>> check.crcs = true
>> request.timeout.ms = 40000
>> ssl.provider = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> ssl.keystore.location = null
>> heartbeat.interval.ms = 3000
>> auto.commit.interval.ms = 5000
>> receive.buffer.bytes = 524288
>> ssl.cipher.suites = null
>> ssl.truststore.type = JKS
>> security.protocol = PLAINTEXT
>> ssl.truststore.location = null
>> ssl.keystore.password = null
>> ssl.keymanager.algorithm = SunX509
>> metrics.sample.window.ms = 30000
>> fetch.min.bytes = 1
>> send.buffer.bytes = 131072
>> auto.offset.reset = earliest
>>
>> 2016-06-22 11:04:49,681 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>> 2016-06-22 11:04:49,681 INFO  AppInfoParser:83 - Kafka commitId :
>> 23c69d62a0cabf06
>> 2016-06-22 11:04:49,865 INFO  KafkaIO:692 - Partitions assigned to split
>> 0 (total 1): eventsim-0
>> 2016-06-22 11:04:49,921 INFO  ConsumerConfig:165 - ConsumerConfig values:
>> metric.reporters = []
>> metadata.max.age.ms = 300000
>> value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> group.id =
>> partition.assignment.strategy =
>> [org.apache.kafka.clients.consumer.RangeAssignor]
>> reconnect.backoff.ms = 50
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> max.partition.fetch.bytes = 1048576
>> bootstrap.servers = [broker1:9092]
>> retry.backoff.ms = 100
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> ssl.keystore.type = JKS
>> ssl.trustmanager.algorithm = PKIX
>> enable.auto.commit = false
>> ssl.key.password = null
>> fetch.max.wait.ms = 500
>> sasl.kerberos.min.time.before.relogin = 60000
>> connections.max.idle.ms = 540000
>> ssl.truststore.password = null
>> session.timeout.ms = 30000
>> metrics.num.samples = 2
>> client.id =
>> ssl.endpoint.identification.algorithm = null
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> ssl.protocol = TLS
>> check.crcs = true
>> request.timeout.ms = 40000
>> ssl.provider = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> ssl.keystore.location = null
>> heartbeat.interval.ms = 3000
>> auto.commit.interval.ms = 5000
>> receive.buffer.bytes = 524288
>> ssl.cipher.suites = null
>> ssl.truststore.type = JKS
>> security.protocol = PLAINTEXT
>> ssl.truststore.location = null
>> ssl.keystore.password = null
>> ssl.keymanager.algorithm = SunX509
>> metrics.sample.window.ms = 30000
>> fetch.min.bytes = 1
>> send.buffer.bytes = 131072
>> auto.offset.reset = earliest
>>
>> 2016-06-22 11:04:49,924 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>> 2016-06-22 11:04:49,924 INFO  AppInfoParser:83 - Kafka commitId :
>> 23c69d62a0cabf06
>> 2016-06-22 11:04:49,926 INFO  KafkaIO:937 - Reader-0: resuming eventsim-0
>> at default offset
>> 2016-06-22 11:04:49,934 INFO  ConsumerConfig:165 - ConsumerConfig values:
>> metric.reporters = []
>> metadata.max.age.ms = 300000
>> value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> group.id = Reader-0_offset_consumer_698533492_none
>> partition.assignment.strategy =
>> [org.apache.kafka.clients.consumer.RangeAssignor]
>> reconnect.backoff.ms = 50
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> max.partition.fetch.bytes = 1048576
>> bootstrap.servers = [broker1:9092]
>> retry.backoff.ms = 100
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> ssl.keystore.type = JKS
>> ssl.trustmanager.algorithm = PKIX
>> enable.auto.commit = false
>> ssl.key.password = null
>> fetch.max.wait.ms = 500
>> sasl.kerberos.min.time.before.relogin = 60000
>> connections.max.idle.ms = 540000
>> ssl.truststore.password = null
>> session.timeout.ms = 30000
>> metrics.num.samples = 2
>> client.id =
>> ssl.endpoint.identification.algorithm = null
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> ssl.protocol = TLS
>> check.crcs = true
>> request.timeout.ms = 40000
>> ssl.provider = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> ssl.keystore.location = null
>> heartbeat.interval.ms = 3000
>> auto.commit.interval.ms = 5000
>> receive.buffer.bytes = 524288
>> ssl.cipher.suites = null
>> ssl.truststore.type = JKS
>> security.protocol = PLAINTEXT
>> ssl.truststore.location = null
>> ssl.keystore.password = null
>> ssl.keymanager.algorithm = SunX509
>> metrics.sample.window.ms = 30000
>> fetch.min.bytes = 1
>> send.buffer.bytes = 131072
>> auto.offset.reset = earliest
>>
>> 2016-06-22 11:04:49,941 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>> 2016-06-22 11:04:49,942 INFO  AppInfoParser:83 - Kafka commitId :
>> 23c69d62a0cabf06
>> 2016-06-22 11:04:50,819 INFO  AbstractCoordinator:529 - Marking the
>> coordinator 2147483647 dead.
>> 2016-06-22 11:04:50,884 INFO  AbstractCoordinator:529 - Marking the
>> coordinator 2147483647 dead.
>> 2016-06-22 11:04:50,931 INFO  AbstractCoordinator:529 - Marking the
>> coordinator 2147483647 dead.
>> 2016-06-22 11:04:50,992 INFO  AbstractCoordinator:529 - Marking the
>> coordinator 2147483647 dead.
>> 2016-06-22 11:04:51,061 INFO  AbstractCoordinator:529 - Marking the
>> coordinator 2147483647 dead.
>> 2016-06-22 11:04:51,212 INFO  KafkaIO:1013 - Reader-0: first record
>> offset 0
>> HERE
>> HERE
>> HERE
>> 2016-06-22 11:04:51,529 INFO  ProducerConfig:165 - ProducerConfig values:
>> compression.type = none
>> metric.reporters = []
>> metadata.max.age.ms = 300000
>> metadata.fetch.timeout.ms = 60000
>> reconnect.backoff.ms = 50
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> bootstrap.servers = [broker1:9092]
>> retry.backoff.ms = 100
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> buffer.memory = 33554432
>> timeout.ms = 30000
>> key.serializer = class
>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> ssl.keystore.type = JKS
>> ssl.trustmanager.algorithm = PKIX
>> block.on.buffer.full = false
>> ssl.key.password = null
>> max.block.ms = 60000
>> sasl.kerberos.min.time.before.relogin = 60000
>> connections.max.idle.ms = 540000
>> ssl.truststore.password = null
>> max.in.flight.requests.per.connection = 5
>> metrics.num.samples = 2
>> client.id =
>> ssl.endpoint.identification.algorithm = null
>> ssl.protocol = TLS
>> request.timeout.ms = 30000
>> ssl.provider = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> acks = 1
>> batch.size = 16384
>> ssl.keystore.location = null
>> receive.buffer.bytes = 32768
>> ssl.cipher.suites = null
>> ssl.truststore.type = JKS
>> security.protocol = PLAINTEXT
>> retries = 3
>> max.request.size = 1048576
>> value.serializer = class
>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>> ssl.truststore.location = null
>> ssl.keystore.password = null
>> ssl.keymanager.algorithm = SunX509
>> metrics.sample.window.ms = 30000
>> partitioner.class = class
>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>> send.buffer.bytes = 131072
>> linger.ms = 0
>>
>> 2016-06-22 11:04:51,574 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>> 2016-06-22 11:04:51,574 INFO  AppInfoParser:83 - Kafka commitId :
>> 23c69d62a0cabf06
>> 2016-06-22 11:04:51,671 WARN  NetworkClient:582 - Error while fetching
>> metadata with correlation id 0 : {eventsimoutput=LEADER_NOT_AVAILABLE}
>> 2016-06-22 11:04:51,797 INFO  KafkaProducer:613 - Closing the Kafka
>> producer with timeoutMillis = 9223372036854775807 ms.
>> HERE
>> 2016-06-22 11:05:13,247 INFO  ProducerConfig:165 - ProducerConfig values:
>> compression.type = none
>> metric.reporters = []
>> metadata.max.age.ms = 300000
>> metadata.fetch.timeout.ms = 60000
>> reconnect.backoff.ms = 50
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> bootstrap.servers = [broker1:9092]
>> retry.backoff.ms = 100
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> buffer.memory = 33554432
>> timeout.ms = 30000
>> key.serializer = class
>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> ssl.keystore.type = JKS
>> ssl.trustmanager.algorithm = PKIX
>> block.on.buffer.full = false
>> ssl.key.password = null
>> max.block.ms = 60000
>> sasl.kerberos.min.time.before.relogin = 60000
>> connections.max.idle.ms = 540000
>> ssl.truststore.password = null
>> max.in.flight.requests.per.connection = 5
>> metrics.num.samples = 2
>> client.id =
>> ssl.endpoint.identification.algorithm = null
>> ssl.protocol = TLS
>> request.timeout.ms = 30000
>> ssl.provider = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> acks = 1
>> batch.size = 16384
>> ssl.keystore.location = null
>> receive.buffer.bytes = 32768
>> ssl.cipher.suites = null
>> ssl.truststore.type = JKS
>> security.protocol = PLAINTEXT
>> retries = 3
>> max.request.size = 1048576
>> value.serializer = class
>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>> ssl.truststore.location = null
>> ssl.keystore.password = null
>> ssl.keymanager.algorithm = SunX509
>> metrics.sample.window.ms = 30000
>> partitioner.class = class
>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>> send.buffer.bytes = 131072
>> linger.ms = 0
>>
>> 2016-06-22 11:05:13,252 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>> 2016-06-22 11:05:13,252 INFO  AppInfoParser:83 - Kafka commitId :
>> 23c69d62a0cabf06
>> 2016-06-22 11:05:13,361 INFO  KafkaProducer:613 - Closing the Kafka
>> producer with timeoutMillis = 9223372036854775807 ms.
>> HERE
>> 2016-06-22 11:05:15,003 INFO  ProducerConfig:165 - ProducerConfig values:
>> compression.type = none
>> metric.reporters = []
>> metadata.max.age.ms = 300000
>> metadata.fetch.timeout.ms = 60000
>> reconnect.backoff.ms = 50
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> bootstrap.servers = [broker1:9092]
>> retry.backoff.ms = 100
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> buffer.memory = 33554432
>> timeout.ms = 30000
>> key.serializer = class
>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> ssl.keystore.type = JKS
>> ssl.trustmanager.algorithm = PKIX
>> block.on.buffer.full = false
>> ssl.key.password = null
>> max.block.ms = 60000
>> sasl.kerberos.min.time.before.relogin = 60000
>> connections.max.idle.ms = 540000
>> ssl.truststore.password = null
>> max.in.flight.requests.per.connection = 5
>> metrics.num.samples = 2
>> client.id =
>> ssl.endpoint.identification.algorithm = null
>> ssl.protocol = TLS
>> request.timeout.ms = 30000
>> ssl.provider = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> acks = 1
>> batch.size = 16384
>> ssl.keystore.location = null
>> receive.buffer.bytes = 32768
>> ssl.cipher.suites = null
>> ssl.truststore.type = JKS
>> security.protocol = PLAINTEXT
>> retries = 3
>> max.request.size = 1048576
>> value.serializer = class
>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>> ssl.truststore.location = null
>> ssl.keystore.password = null
>> ssl.keymanager.algorithm = SunX509
>> metrics.sample.window.ms = 30000
>> partitioner.class = class
>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>> send.buffer.bytes = 131072
>> linger.ms = 0
>>
>> 2016-06-22 11:05:15,008 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>> 2016-06-22 11:05:15,008 INFO  AppInfoParser:83 - Kafka commitId :
>> 23c69d62a0cabf06
>> 2016-06-22 11:05:15,120 INFO  KafkaProducer:613 - Closing the Kafka
>> producer with timeoutMillis = 9223372036854775807 ms.
>> HERE
>> 2016-06-22 11:06:20,735 INFO  ProducerConfig:165 - ProducerConfig values:
>> compression.type = none
>> metric.reporters = []
>> metadata.max.age.ms = 300000
>> metadata.fetch.timeout.ms = 60000
>> reconnect.backoff.ms = 50
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> bootstrap.servers = [broker1:9092]
>> retry.backoff.ms = 100
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> buffer.memory = 33554432
>> timeout.ms = 30000
>> key.serializer = class
>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> ssl.keystore.type = JKS
>> ssl.trustmanager.algorithm = PKIX
>> block.on.buffer.full = false
>> ssl.key.password = null
>> max.block.ms = 60000
>> sasl.kerberos.min.time.before.relogin = 60000
>> connections.max.idle.ms = 540000
>> ssl.truststore.password = null
>> max.in.flight.requests.per.connection = 5
>> metrics.num.samples = 2
>> client.id =
>> ssl.endpoint.identification.algorithm = null
>> ssl.protocol = TLS
>> request.timeout.ms = 30000
>> ssl.provider = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> acks = 1
>> batch.size = 16384
>> ssl.keystore.location = null
>> receive.buffer.bytes = 32768
>> ssl.cipher.suites = null
>> ssl.truststore.type = JKS
>> security.protocol = PLAINTEXT
>> retries = 3
>> max.request.size = 1048576
>> value.serializer = class
>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>> ssl.truststore.location = null
>> ssl.keystore.password = null
>> ssl.keymanager.algorithm = SunX509
>> metrics.sample.window.ms = 30000
>> partitioner.class = class
>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>> send.buffer.bytes = 131072
>> linger.ms = 0
>>
>> 2016-06-22 11:06:20,743 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>> 2016-06-22 11:06:20,744 INFO  AppInfoParser:83 - Kafka commitId :
>> 23c69d62a0cabf06
>> 2016-06-22 11:06:20,849 INFO  KafkaProducer:613 - Closing the Kafka
>> producer with timeoutMillis = 9223372036854775807 ms.
>>
>> Thanks,
>>
>> Jesse
>>
>
>

Reply via email to