Releases: xiatechs/pulsar
Releases · xiatechs/pulsar
v1.0.5
v1.0.3
v1.0.1
v1.0.0
v1.0.4
[fix] [ml] fix discontinuous ledger deletion (#20898) ### Motivation - The task `trim ledgers` runs in the thread `BkMainThreadPool.choose(ledgerName)` - The task `write entries to BK` runs in the thread `BkMainThreadPool.choose(ledgerId)` So the two tasks above may run concurrently/ The task `trim ledgers` work as the flow below: - find the ledgers which are no longer to read, the result is `{Ledgers before the slowest read}`. - check if the `{Ledgers before the slowest read}` is out of retention policy, the result is `{Ledgers to be deleted}`. - if the create time of the ledger is lower than the earliest retention time, mark it should be deleted - if after deleting this ledger, the rest ledgers are still larger than the retention size, mark it should be deleted - delete the`{Ledgers to be deleted}` **(Highlight)** There is a scenario that causes the task `trim ledgers` did discontinuous ledger deletion, resulting consume messages discontinuous: - context: - ledgers: `[{id=1, size=100}, {id=2,size=100}]` - retention size: 150 - no cursor there - Check `ledger 1`, skip by retention check `(200 - 100) < 150` - One in-flight writing is finished, the `calculateTotalSizeWrited()` would return `300` now. - Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 should be deleted. - Delete the `ledger 2`. - Create a new consumer. It will receive messages from `[ledger-1, ledegr-3]`, but the `ledger-2` will be skipped. ### Modifications Once the retention constraint has been met, break the loop.