Hi Raghu,
Thanks for the checkpointing info. I'll have to think about this more, but
I'm wondering if there's some custom work I can do to have the output of
the final stage perform some offset commits back to Kafka. There was
another thread here I commented in about giving Write transforms an output
so that further processing could be done if and only if a Write is
confirmed, and that would be very handy here. Is there perhaps some way to
wrap my PubSubIO Write stage in a custom class so that I can override
certain lifecycle stages to write up a commit to Kafka?
Regarding the truststore on dataflow: I required the Factory because of the
local file problem you mention, so I put the rest of the configs in there
too. My solution to the local file problem was to make the keystore /
truststore files a Java class resource, since those are distributed to
workers, and the resource is written to a temporary location on the worker
during consumer creation:
private static class KafkaConsumerFactoryFn
implements SerializableFunction<Map<String, Object>,
Consumer<byte[], byte[]>> {
@Override
public Consumer<byte[], byte[]> apply(Map<String, Object>
stringObjectMap) {
// kafka's truststore is embedded as a static resource, but
kafka wants it as a file on disk, so write it
// to a temp file and use that
final File truststore;
try {
truststore = File.createTempFile("truststore", ".jks");
truststore.deleteOnExit();
InputStream truststoreStream =
JOB_CLASS_HERE.class.getResourceAsStream("RESOURCE_NAME_HERE");
Files.copy(truststoreStream, truststore.toPath(),
StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
e.printStackTrace();
return null;
}
...
config.put("ssl.truststore.location",
truststore.toPath().toString());
Cheers,
Gwilym
On 30 June 2017 at 16:36, Raghu Angadi <[email protected]> wrote:
> Gwilym,
>
> I think your understanding is correct, with one caveat as noted below. As
> Jingsong suggested committing offsets in 'KafkaCheckpointMark.
> finalizeCheckpoint()
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L48>'
> is required. I had left a comment there. It is fairly straight forward to
> add this as an option.
>
> Note that finalizing would give at least once semantics only if you drain
> a pipeline before restarting it. If a pipeline is killed or crashes, you
> can still miss some records. Finalize checkpoint is called in Dataflow once
> the messages are checkpointed for the current stage. The downstream stages
> might not have processed them. Draining a pipeline ensures that all the
> input is processed through the pipeline.
>
> > The consumer factory is used because of some runtime SSL key/truststore
> setup:
>
> btw, KafkaIO includes api to set
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L207>
> consumer
> configs. Wasn't it enough?
> Did you get trust store config working with Dataflow? Last I remember
> "ssl.truststore.location" had to be a local file on the worker and it was
> not easy to make that accessible.
>
> On Thu, Jun 29, 2017 at 10:47 PM, JingsongLee <[email protected]>
> wrote:
>
>> Sorry, forget user mail group. + @user
>>
>> Yeah, although KafkaIO (exposed offsets interface) is different from P
>> ubSubIO, committing the offsets to Kafka in finalizeCheckpoint is also
>> a way.
>> Welcome to contribute and maybe @Raghu Angadi can show more messages.
>>
>> Best, Jingsong Lee
>>
>> ------------------------------------------------------------------
>> From:Gwilym Evans <[email protected]>
>> Time:2017 Jun 30 (Fri) 13:27
>> To:JingsongLee <[email protected]>
>> Subject:Re: Kafka offset management
>>
>> Thanks for the info. I'll have a look and see if I can do anything
>> similar.
>>
>> I am very new to Beam internals, but I've been having a look at the
>> KafkaIO code and comparing it to the PubSubIO code.
>>
>> I think that the finalizeCheckpoint implementation for KafkaIO should
>> definitely be committing the offsets to Kafka, if possible. But perhaps
>> only when a Kafka group.id is configured, as committing offsets for
>> random or blank group IDs is kind of pointless.
>>
>> I think I'll take a shot at making and contributing this, even if it's
>> optional. Unless you can think of a reason to specifically not do this?
>>
>> Though, looking a the KafkaIO source for this, there is even a comment
>> there alluding to the fact that this should maybe be done to provide better
>> restart options.
>>
>> -Gwilym
>>
>>
>> On 30 June 2017 at 05:08, JingsongLee <[email protected]> wrote:
>> Oh. I know what you mean.
>>
>> In our production, if we need to re-run (lose checkpoint and state when
>> a job crashes, is canceled, or is drained), we will set the KafkaIO
>> startTime to start a new job, because we generally know the last
>> consumer timestamp of previous job. (Do not be too precise, back t
>> o a safe point can be)
>> This feature is finished in 2.1.0 version.
>> Jira: https://issues.apache.org/jira/browse/BEAM-2248
>>
>> A more accurate way is re-run by kafka offsets(not support yet), but you
>> should konw the last snapshot of job.
>>
>> Best, Jingsong Lee
>>
>> ------------------------------------------------------------------
>> From:Gwilym Evans <[email protected]>
>> Time:2017 Jun 30 (Fri) 12:20
>> To:user <[email protected]>; JingsongLee <[email protected]>
>> Subject:Re: Kafka offset management
>>
>> Hi JingsongLee,
>>
>> Thanks for the reply.
>>
>> What I'm trying to avoid are lost / skipped messages due to two
>> situations:
>>
>> 1. Lost offsets, or
>> 2. Premature offset commits
>>
>> I've researched snapshot checkpoints, and from what I understand these
>> are only maintained in Dataflow when a job is updated. If a job crashes, is
>> cancelled, or is drained, then the checkpoints are lost. This is situation
>> (1) above.
>>
>> From what I understand about auto-commit offsets, it's where the Kafka
>> client periodically commits offsets it has polled automatically. In the
>> case of Beam and Dataflow, this would be even if the offsets it is
>> committing has not yet been fully processed by the pipeline. This is
>> situation (2) above.
>>
>> So far I'm not seeing a way to avoid data loss besides resetting to the
>> earliest offset when a job starts. But, given we retain data in our Kafka
>> topics for up to 7 days, that is not feasible from a performance point of
>> view.
>>
>> Can anyone confirm / deny my understanding here?
>>
>> Cheers,
>> Gwilym
>>
>> On 30 June 2017 at 02:59, JingsongLee <[email protected]> wrote:
>> Hi Gwilym
>> KafkaIO uses the save offset to the snapshot (checkpoint) instead of commit
>> offsets to
>> Kafka for restarting.
>> You can use a kafka client configuration to open auto commit of offsets.
>>
>> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = true
>>
>> Hope this helps.
>>
>> Best, JingsongLee
>>
>> ------------------------------------------------------------------
>> From:Gwilym Evans <[email protected]>
>> Time:2017 Jun 29 (Thu) 15:32
>> To:user <[email protected]>
>> Subject:Kafka offset management
>>
>> Hi list,
>>
>> I was playing around with KafkaIO today to understand how it behaves in a
>> failure or restart scenario (be it crash, cancel, or drain), and I found it
>> "lost" (or skipped) Kafka messages in these cases. That is, it resumed from
>> the latest offsets rather than the last successfully processed offsets, and
>> a gap in messages was observed as a result.
>>
>> My KafkaIO transform looks like:
>>
>> KafkaIO.<Long, String>readBytes()
>> .withConsumerFactoryFn(new KafkaConsumerFactoryFn())
>> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
>> .withTopics(ImmutableList.of(KAFKA_TOPIC))
>>
>> The consumer factory is used because of some runtime SSL key/truststore
>> setup:
>>
>> final Map<String, Object> config = Maps.newHashMap();
>> config.put("auto.offset.reset", "latest");
>> config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
>> config.put("group.id", KAFKA_GROUP_ID);
>> config.put("key.deserializer", ByteArrayDeserializer.class.ge
>> tName());
>> config.put("security.protocol", "SSL");
>> config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");
>> config.put("ssl.truststore.location",
>> truststore.toPath().toString());
>> config.put("ssl.truststore.password",
>> kafkaTruststorePassword);
>> config.put("value.deserializer",
>> ByteArrayDeserializer.class.getName());
>>
>> return new KafkaConsumer<>(config);
>>
>> So, I am setting a group.id, and I know KafkaIO is using the new
>> consumer because our Zookeeper is not accessible from Dataflow.
>>
>> When I look on the Kafka cluster, there is no record of the consumer
>> group's offsets. So I take it KafkaIO is not committing offsets to Kafka.
>>
>> Can this be changed to commit offsets when a "batch" of streaming
>> messages are seen as processed OK? I am fine with at-least-once.
>>
>> I am using Dataflow, Beam 2.0, Kafka 0.10.2.1
>>
>> Cheers,
>> Gwilym
>>
>>
>>
>>
>>
>>
>>
>