> Question 1: at what point, if ever, is the original topic purged to make 
> space?  If I set an infinite retention policy on the topic, will compaction 
> ever reclaim space?

This is an implementation mechanism inside pulsar. In pulsar, topic is a 
logical concept. A topic corresponds to a manage ledger. When you write data, 
it actually writes the data to the ledger. One core of pulsar design is: In 
pulsar, all operations are asynchronous, so when the retention reaches the 
specified threshold, whether to delete the data in the corresponding ledger, 
this operation is also asynchronous. The delete operation is not performed on 
the currently active ledger. Only when the data is full of the current ledger, 
the ledger will switch to the real execution of the retention policy, freeing 
up the space.

> Question 2: I don't understand what happens if you run compaction on an 
> already-compacted topic.  Is the compacted topic compacted, or is the 
> original topic re-compacted, or something else?

As you have provided, we can get the operation state of the last compact 
through compaction-stats. If it succeeds, the next compact operation will 
continue to be compact on the already-compacted basis instead of starting from 
the original. But whether it will actually trigger the compression operation, 
it needs further inspection.

For more details on compact topic, you can refer to: 
https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction 
<https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction>

> 在 2019年10月11日,下午9:12,Brian Candler <[email protected]> 写道:
> 
> I have a couple of questions about topic compaction.
> 
> I have been able to demonstrate it working, using the python API (*).  After 
> producing messages with repeated keys, I did a manual compaction:
> 
> $ apache-pulsar-2.4.1/bin/pulsar-admin topics compact avro-topic
> Topic compaction requested for persistent://public/default/avro-topic
> $ apache-pulsar-2.4.1/bin/pulsar-admin topics compaction-status avro-topic
> Compaction was a success
> $ apache-pulsar-2.4.1/bin/pulsar-admin topics reset-cursor 
> persistent://public/default/avro-topic -s my-subscription -t 10h
> 
> The consumer (created with is_read_compacted=True) then sees only the latest 
> message for each key.  That's great.
> 
> 
> However, all the messages in the original topic are still available - I can 
> see them using the reader API.
> 
> Question 1: at what point, if ever, is the original topic purged to make 
> space?  If I set an infinite retention policy on the topic, will compaction 
> ever reclaim space?
> 
> Question 2: I don't understand what happens if you run compaction on an 
> already-compacted topic.  Is the compacted topic compacted, or is the 
> original topic re-compacted, or something else?
> 
> Regards,
> 
> Brian.
> 
> 
> (*) Code:
> 
> === producer ===
> 
> import pulsar
> 
> from pulsar.schema import *
> 
> class Foo(Record):
>     a = String()
>     b = Integer()
>     c = Boolean()
> 
> client = pulsar.Client('pulsar://localhost:6650')
> 
> producer = client.create_producer('avro-topic', schema=AvroSchema(Foo))
> 
> for i in range(10):
>     producer.send(Foo(a="hello%d" % i, b=i, c=True), partition_key=str(i % 3))
> 
> client.close()
> 
> # Problem 1: the python API doesn't have "key", but it does have 
> "partition_key".  So even though I'm using a non-partitioned topic, I'm 
> assuming that "partition_key" equates to "message key"
> 
> === reader ===
> 
> from pulsar.schema import *
> 
> class Foo(Record):
>     a = String()
>     b = Integer()
>     c = Boolean()
> 
> client = pulsar.Client('pulsar://localhost:6650')
> 
> msg_id = pulsar.MessageId.earliest
> 
> reader = client.create_reader('avro-topic', msg_id,
>          schema=AvroSchema(Foo),
>          #is_read_compacted=True,
>          )
> 
> while True:
>     msg = reader.read_next()
>     print("Received message %r key=%r id=%s" % (msg.value().a, 
> msg.partition_key(), msg.message_id()))
> 
> # Problem 2: the "reader" API doesn't appear to have "is_read_compacted", so 
> it's always reading from the non-compacted version of the topic.
> 
> === consumer ===
> 
> import pulsar
> 
> from pulsar.schema import *
> 
> class Foo(Record):
>     a = String()
>     b = Integer()
>     c = Boolean()
> 
> client = pulsar.Client('pulsar://localhost:6650')
> 
> consumer = client.subscribe('avro-topic', 'my-subscription', 
> schema=AvroSchema(Foo),
>             is_read_compacted=True)
> 
> while True:
>     msg = consumer.receive()
>     try:
>         print("Received message %r key=%r id=%s" % (msg.value().a, 
> msg.partition_key(), msg.message_id()))
>         consumer.acknowledge(msg)
>     except Exception as e:
>         # Message failed to be processed
>         print("Oops %r" % e)
>         consumer.negative_acknowledge(msg)
> 
> # Works if the topic is manually compacted and then the cursor is manually 
> reset
> 
> ======
> 
> 

Reply via email to