I share the same connection factory/MessageProducer on the different
writer (optionnal).
Regards
JB
On 06/22/2016 10:34 PM, Jesse Anderson wrote:
@Raghu is there a reason why you'd need more than one KafkaProducer per
process? Maybe the KafkaProducer could be static.
@JB how are you handling this in the JMS IO?
On Wed, Jun 22, 2016 at 12:36 PM Aljoscha Krettek <[email protected]
<mailto:[email protected]>> wrote:
The Flink runner sits at the other end of the spectrum, in that
everything is one big bundle. So the Kafka Sind should work well
with that.
On Wed, 22 Jun 2016 at 20:30 Raghu Angadi <[email protected]
<mailto:[email protected]>> wrote:
Yeah, this is pretty bad right now. There is a proposal to add
API to DoFn that makes it easy to state across multiple bundles.
I think the current sink is pretty unusable in this state. We
could use an cache of producers (that expire if unused for a few
seconds).
On Wed, Jun 22, 2016 at 9:12 AM, Jesse Anderson
<[email protected] <mailto:[email protected]>> wrote:
The KafkaIO KafkaProducer seems to be closing after every
send. On the next send, the KafkaProducer is opened again.
Here is the log output;
2016-06-22 11:04:49,356 WARN KafkaIO:714 - Looks like
generateSplits() is not called. Generate single split.
2016-06-22 11:04:49,399 INFO ConsumerConfig:165 -
ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms <http://metadata.max.age.ms> = 300000
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id <http://group.id> =
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [broker1:9092]
retry.backoff.ms <http://retry.backoff.ms> = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name
<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms <http://fetch.max.wait.ms> = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms <http://connections.max.idle.ms> =
540000
ssl.truststore.password = null
session.timeout.ms <http://session.timeout.ms> = 30000
metrics.num.samples = 2
client.id <http://client.id> =
ssl.endpoint.identification.algorithm = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms <http://request.timeout.ms> = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000
auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000
receive.buffer.bytes = 524288
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms <http://metrics.sample.window.ms> =
30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest
2016-06-22 11:04:49,681 INFO AppInfoParser:82 - Kafka
version : 0.9.0.1
2016-06-22 11:04:49,681 INFO AppInfoParser:83 - Kafka
commitId : 23c69d62a0cabf06
2016-06-22 11:04:49,865 INFO KafkaIO:692 - Partitions
assigned to split 0 (total 1): eventsim-0
2016-06-22 11:04:49,921 INFO ConsumerConfig:165 -
ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms <http://metadata.max.age.ms> = 300000
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id <http://group.id> =
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [broker1:9092]
retry.backoff.ms <http://retry.backoff.ms> = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name
<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms <http://fetch.max.wait.ms> = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms <http://connections.max.idle.ms> =
540000
ssl.truststore.password = null
session.timeout.ms <http://session.timeout.ms> = 30000
metrics.num.samples = 2
client.id <http://client.id> =
ssl.endpoint.identification.algorithm = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms <http://request.timeout.ms> = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000
auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000
receive.buffer.bytes = 524288
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms <http://metrics.sample.window.ms> =
30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest
2016-06-22 11:04:49,924 INFO AppInfoParser:82 - Kafka
version : 0.9.0.1
2016-06-22 11:04:49,924 INFO AppInfoParser:83 - Kafka
commitId : 23c69d62a0cabf06
2016-06-22 11:04:49,926 INFO KafkaIO:937 - Reader-0:
resuming eventsim-0 at default offset
2016-06-22 11:04:49,934 INFO ConsumerConfig:165 -
ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms <http://metadata.max.age.ms> = 300000
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id <http://group.id> =
Reader-0_offset_consumer_698533492_none
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [broker1:9092]
retry.backoff.ms <http://retry.backoff.ms> = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name
<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms <http://fetch.max.wait.ms> = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms <http://connections.max.idle.ms> =
540000
ssl.truststore.password = null
session.timeout.ms <http://session.timeout.ms> = 30000
metrics.num.samples = 2
client.id <http://client.id> =
ssl.endpoint.identification.algorithm = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms <http://request.timeout.ms> = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000
auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000
receive.buffer.bytes = 524288
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms <http://metrics.sample.window.ms> =
30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest
2016-06-22 11:04:49,941 INFO AppInfoParser:82 - Kafka
version : 0.9.0.1
2016-06-22 11:04:49,942 INFO AppInfoParser:83 - Kafka
commitId : 23c69d62a0cabf06
2016-06-22 11:04:50,819 INFO AbstractCoordinator:529 -
Marking the coordinator 2147483647 <tel:2147483647> dead.
2016-06-22 11:04:50,884 INFO AbstractCoordinator:529 -
Marking the coordinator 2147483647 <tel:2147483647> dead.
2016-06-22 11:04:50,931 INFO AbstractCoordinator:529 -
Marking the coordinator 2147483647 <tel:2147483647> dead.
2016-06-22 11:04:50,992 INFO AbstractCoordinator:529 -
Marking the coordinator 2147483647 <tel:2147483647> dead.
2016-06-22 11:04:51,061 INFO AbstractCoordinator:529 -
Marking the coordinator 2147483647 <tel:2147483647> dead.
2016-06-22 11:04:51,212 INFO KafkaIO:1013 - Reader-0: first
record offset 0
HERE
HERE
HERE
2016-06-22 11:04:51,529 INFO ProducerConfig:165 -
ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms <http://metadata.max.age.ms> = 300000
metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms>
= 60000
reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [broker1:9092]
retry.backoff.ms <http://retry.backoff.ms> = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms <http://timeout.ms> = 30000
key.serializer = class
org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
sasl.kerberos.service.name
<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms <http://max.block.ms> = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms <http://connections.max.idle.ms> =
540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id <http://client.id> =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms <http://request.timeout.ms> = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 3
max.request.size = 1048576
value.serializer = class
org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms <http://metrics.sample.window.ms> =
30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms <http://linger.ms> = 0
2016-06-22 11:04:51,574 INFO AppInfoParser:82 - Kafka
version : 0.9.0.1
2016-06-22 11:04:51,574 INFO AppInfoParser:83 - Kafka
commitId : 23c69d62a0cabf06
2016-06-22 11:04:51,671 WARN NetworkClient:582 - Error
while fetching metadata with correlation id 0 :
{eventsimoutput=LEADER_NOT_AVAILABLE}
2016-06-22 11:04:51,797 INFO KafkaProducer:613 - Closing
the Kafka producer with timeoutMillis = 9223372036854775807 ms.
HERE
2016-06-22 11:05:13,247 INFO ProducerConfig:165 -
ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms <http://metadata.max.age.ms> = 300000
metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms>
= 60000
reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [broker1:9092]
retry.backoff.ms <http://retry.backoff.ms> = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms <http://timeout.ms> = 30000
key.serializer = class
org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
sasl.kerberos.service.name
<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms <http://max.block.ms> = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms <http://connections.max.idle.ms> =
540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id <http://client.id> =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms <http://request.timeout.ms> = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 3
max.request.size = 1048576
value.serializer = class
org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms <http://metrics.sample.window.ms> =
30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms <http://linger.ms> = 0
2016-06-22 11:05:13,252 INFO AppInfoParser:82 - Kafka
version : 0.9.0.1
2016-06-22 11:05:13,252 INFO AppInfoParser:83 - Kafka
commitId : 23c69d62a0cabf06
2016-06-22 11:05:13,361 INFO KafkaProducer:613 - Closing
the Kafka producer with timeoutMillis = 9223372036854775807 ms.
HERE
2016-06-22 11:05:15,003 INFO ProducerConfig:165 -
ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms <http://metadata.max.age.ms> = 300000
metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms>
= 60000
reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [broker1:9092]
retry.backoff.ms <http://retry.backoff.ms> = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms <http://timeout.ms> = 30000
key.serializer = class
org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
sasl.kerberos.service.name
<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms <http://max.block.ms> = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms <http://connections.max.idle.ms> =
540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id <http://client.id> =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms <http://request.timeout.ms> = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 3
max.request.size = 1048576
value.serializer = class
org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms <http://metrics.sample.window.ms> =
30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms <http://linger.ms> = 0
2016-06-22 11:05:15,008 INFO AppInfoParser:82 - Kafka
version : 0.9.0.1
2016-06-22 11:05:15,008 INFO AppInfoParser:83 - Kafka
commitId : 23c69d62a0cabf06
2016-06-22 11:05:15,120 INFO KafkaProducer:613 - Closing
the Kafka producer with timeoutMillis = 9223372036854775807 ms.
HERE
2016-06-22 11:06:20,735 INFO ProducerConfig:165 -
ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms <http://metadata.max.age.ms> = 300000
metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms>
= 60000
reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [broker1:9092]
retry.backoff.ms <http://retry.backoff.ms> = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms <http://timeout.ms> = 30000
key.serializer = class
org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
sasl.kerberos.service.name
<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms <http://max.block.ms> = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms <http://connections.max.idle.ms> =
540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id <http://client.id> =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms <http://request.timeout.ms> = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 3
max.request.size = 1048576
value.serializer = class
org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms <http://metrics.sample.window.ms> =
30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms <http://linger.ms> = 0
2016-06-22 11:06:20,743 INFO AppInfoParser:82 - Kafka
version : 0.9.0.1
2016-06-22 11:06:20,744 INFO AppInfoParser:83 - Kafka
commitId : 23c69d62a0cabf06
2016-06-22 11:06:20,849 INFO KafkaProducer:613 - Closing
the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Thanks,
Jesse
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com