> this means that I am loosing data or if this will be retried by the sink?
I don't have direct experience with KafkaIO, but noting that this exception happened in the finishBundle method, Beam will not have committed the bundle. More specifically, looking at the KafkaWriter code, I see that finishBundle calls producer.flush() which will block until all messages have reached Kafka and the SendCallbacks have completed, then it checks whether any of the callbacks failed. In this case, one of the callbacks shows the above exception, so it raises, which should stop pipeline execution and prevent the bundle from being committed. > what is the best way to configure the KafkaIO sink to be less aggressive? I expect you may be able to avoid hitting these exception entirely by some combination of increasing the producer config settings [0] retries and retry.backoff.ms . These can be set via KafkaIO.write().updateProducerProperties. https://kafka.apache.org/documentation/#producerconfigs [0] On Tue, Jan 22, 2019 at 5:06 AM Vilhelm von Ehrenheim < [email protected]> wrote: > Hi! > I sometimes get the following error in one of my streaming pipelines that > use KafkaIO as sink: > > java.io.IOException: KafkaWriter : failed to send 1 records (since last > report) > > org.apache.beam.sdk.io.kafka.KafkaWriter.checkForFailures(KafkaWriter.java:120) > > org.apache.beam.sdk.io.kafka.KafkaWriter.finishBundle(KafkaWriter.java:74) > Caused by: org.apache.kafka.common.errors.KafkaStorageException: Disk error > when trying to access log file on the disk. > > My Kafka setup is not super beefy and as I understand it this seems to > happen when it is under heavy load from my Dataflow job. > > What I am wondering is essentially if this means that I am loosing data or > if this will be retried by the sink? Also if this means losing the record > what is the best way to configure the KafkaIO sink to be less aggressive? > > I am still Beam 2.8 in this pipeline. > > Regards, > Vilhelm von Ehrenheim >
