Proposal: Implement checkpoints for SyncMetas in compactors#8765
Proposal: Implement checkpoints for SyncMetas in compactors#8765waltherlee wants to merge 3 commits into
Conversation
Signed-off-by: Walther Lee <walthere.lee@gmail.com>
| * `knownBlockIDs` `map[ulid.ULID]struct{}`, all block IDs known since last compaction. | ||
| * `checkpointDir` `string`, for the on-disk checkpoint. | ||
|
|
||
| `blocksDownloading` keeps track of blocks downloading. After completion, only blocks with no previous blocks pending will be used for checkpoints. |
There was a problem hiding this comment.
only blocks with no previous blocks pending ? 😄 what does that mean?
There was a problem hiding this comment.
Yeah, I struggled to explain the relations here, but I meant pending = download in progress 😅
I described it a little more in the paragraph below. Block meta files are downloaded concurrently, so if blocks A, B and C are being downloaded, it can happen that:
Cfinishes butAis still pending. Without this, thelastCheckpointableIDwill beC.- Compactor is restarted without finishing the download of
A. A checkpoint is saved withC. - After restart, the compactor resumes listing blocks after
C, but it never downloadedA.
By keeping track of blocks being downloaded, it only uses as checkpoints blocks that don't have other lexicographically lower blocks pending full download.
Realistically, meta files are small, and maybe the compactors wait to finish all downloads in progress when it receives a sigterm, but I wanted to cover some edge cases like throttling in storage, sudden conn issues, etc.
|
|
||
| For the on-disk checkpoint, `lastCheckpointableID` and `knownBlockIDs` are stored in a gzip-compressed JSON file in `checkpointDir`. If a file exists, it is loaded to `BaseFetcher` when the compactor starts. | ||
|
|
||
| To resume a sync, the `GetActiveAndPartialBlockIDs` method in the `Lister` interface in the package `block` takes a new `startAfter` `string` that is passed to the object storage implementation in `obj_store`. Storage must respond by listing only objects alphabetically higher than the value. In this case, it doesn’t matter if it’s inclusive or not. |
There was a problem hiding this comment.
Could you show some example of how this param could be used? Is there no way to pass a date there? 🤔
There was a problem hiding this comment.
Interesting. It's def possible. The implementations in objstore takes string keys, and the prefix in Thanos is the block ID, so it made sense to me to use what we already had in the listing results.
But yes, this could be a date or timestamp as well.
You mean to avoid referencing blocks in Lister?
|
|
||
| At the end of a cycle it removes the file and clears the in-memory checkpoint in `BaseFetcher`. | ||
|
|
||
| The checkpoint implementation is by default disabled and will have to be enabled with a flag. |
There was a problem hiding this comment.
Why not enable this for all? This seems like a great addition.
There was a problem hiding this comment.
No reason I can think of. You mean removing the flag or just enabling it by default?
|
|
||
| The compactor saves the checkpoint file after every `syncMetas` in the main compaction cycle. Basically before starting compactions, before each of both downsampling cycles and before applying retention policies. It also saves a checkpoint if an error interrupts the main `runCompact`. | ||
|
|
||
| At the end of a cycle it removes the file and clears the in-memory checkpoint in `BaseFetcher`. |
There was a problem hiding this comment.
Why do we need to delete the checkpoint?
There was a problem hiding this comment.
To remove from the checkpoints blocks deleted in the cycle that just ended.
I implemented this in v0.39.2, so I think that changes already in the current version, but in v0.39 after finishing compactions and downsamples, the compactor deletes old blocks marked for deletion.
If we keep those in the checkpoint, the next cycle will fail when it tries to check if they have a deletion mark (or chunks), and that will trigger a full sync anyway, so I just decided to do it here anyway 🙂
| * `enableCheckpoint` `bool`, true to enable checkpoints. False by default. | ||
| * `blocksDownloading` `struct`, sorted list to keep track of downloads pending. | ||
| * `lastCheckpointableID` `ulid.ULID`, all blocks up to this ID have been completely downloaded. | ||
| * `knownBlockIDs` `map[ulid.ULID]struct{}`, all block IDs known since last compaction. |
There was a problem hiding this comment.
Why do we need this? Could you expand a little bit?
There was a problem hiding this comment.
Sure! This backfills resp.metas (here) with all blocks that were already downloaded before the checkpoint, without re-fetching them from object storage.
We could recreate this from disk, but I've seen storage issues in interrupted compactors. For example, compactor shards that finish successfully delete blocks as expected, but stuck shards keep resuming and piling up meta files because cleanup only occurs after a full cycle. So these interrupted shards were hoarding blocks that were already deleted.
I also had shards that had been stuck for months, so the list of blocks on disk was way longer than the one on storage. By keeping only a list of blocks from the latest sync, I skip those deleted blocks and save disk lookups.
| The compactors already use `BaseFetcher` to sync metadata. This adds 5 new fields: | ||
|
|
||
| * `enableCheckpoint` `bool`, true to enable checkpoints. False by default. | ||
| * `blocksDownloading` `struct`, sorted list to keep track of downloads pending. |
There was a problem hiding this comment.
Also, why this is needed? Isn't it enough to store the last timestamp embedded in a block?
There was a problem hiding this comment.
Same as here: #8765 (comment)
To prevent a race condition where out-of-order downloads cause the checkpoint to jump ahead. This ensures we don't skip pending blocks that are lexicographically earlier but slower to download, avoiding gaps after a restart.
|
@waltherlee out of curiosity: how long it takes to list blocks in your case right now? Is it really about an hour? If you can answer: do you use some homegrown or open-source object storage implementation or something from SaaS providers? Is it the same with both listing strategies? |
|
@GiedriusS Yes, over an hour with AWS S3 buckets in multiple clusters. It was mostly caused by a high rate of new blocks, a long list of pending ones (+1M total per cluster), and a low compaction rate in some shards. I could also check our GCP buckets, but the ones I checked when working on this were S3. However, we've had these checkpoints implemented in an internal Thanos fork for around 3 months now, so it no longer takes that long and our compactors already caught up with the queue. It takes around 7 minutes now to sync from disk, and milliseconds from the in-memory checkpoint. |
Above 1h was with recursive. I did try with eager and remember it was faster but not by much. I can check my notes next week. But the main problem we had was the interactions with autoscaler. We have heavy blocks, so for a fresh compactor, it usually takes time and multiple restarts to reach the right resources to process them. However, if the pod is constantly restarting and spending most of the time just reading from S3, the VPA scales down, and it makes them fall in a cycle of scaling up and down just listing. That's how we ended up with the long list of pending blocks |
|
Ok, I checked my notes and
|
A little proposal to implement in-memory cache for the compactor's MetaSync to speed up sequential syncs, especially the periodic ones for progress metrics. And also an on-disk checkpoint to recover a long MetaSync interrupted by a restart.
The main problems this is trying to solve are:
I tried to keep it short, but this is something I already implemented on a v0.39.2 fork and has been running for about 1 month with no issues in multiple buckets with 1M+ blocks, so I'm happy to share any more details you need 🙂