Skip to content

Commit 16920f6

Browse files
committed
Revert "[persist] Always perform routine maintenance (#16654)"
This reverts commit 0e1aaa0.
1 parent e6af892 commit 16920f6

11 files changed

Lines changed: 163 additions & 277 deletions

File tree

src/persist-client/src/cli/admin.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,9 @@ pub async fn force_compaction(
248248
.sum::<usize>(),
249249
start.elapsed(),
250250
);
251-
let (apply_res, maintenance) = machine
251+
let apply_res = machine
252252
.merge_res(&FueledMergeRes { output: res.output })
253253
.await;
254-
if !maintenance.is_empty() {
255-
info!("ignoring non-empty requested maintenance: {maintenance:?}")
256-
}
257254
match apply_res {
258255
ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => {
259256
info!("attempt {} req {}: {:?}", attempt, idx, apply_res);

src/persist-client/src/critical.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,7 @@ where
297297
/// Politely expires this reader, releasing its since capability.
298298
#[instrument(level = "debug", skip_all, fields(shard = %self.machine.shard_id()))]
299299
pub async fn expire(mut self) {
300-
let (_, maintenance) = self.machine.expire_critical_reader(&self.reader_id).await;
301-
maintenance.start_performing(&self.machine, &self.gc);
300+
self.machine.expire_critical_reader(&self.reader_id).await;
302301
}
303302
}
304303

src/persist-client/src/internal/compact.rs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use tracing::{debug, debug_span, trace, Instrument, Span};
4040
use crate::async_runtime::CpuHeavyRuntime;
4141
use crate::batch::BatchParts;
4242
use crate::fetch::{fetch_batch_part, EncodedPart};
43-
use crate::internal::gc::GarbageCollector;
4443
use crate::internal::machine::{retry_external, Machine};
4544
use crate::internal::state::{HollowBatch, HollowBatchPart};
4645
use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
@@ -74,7 +73,7 @@ pub struct CompactRes<T> {
7473
/// This will possibly be called over RPC in the future. Physical compaction is
7574
/// merging adjacent batches. Logical compaction is advancing timestamps to a
7675
/// new since and consolidating the resulting updates.
77-
#[derive(Debug)]
76+
#[derive(Debug, Clone)]
7877
pub struct Compactor<K, V, T, D> {
7978
cfg: PersistConfig,
8079
metrics: Arc<Metrics>,
@@ -87,30 +86,18 @@ pub struct Compactor<K, V, T, D> {
8786
_phantom: PhantomData<fn() -> D>,
8887
}
8988

90-
impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
91-
fn clone(&self) -> Self {
92-
Compactor {
93-
cfg: self.cfg.clone(),
94-
metrics: Arc::clone(&self.metrics),
95-
sender: self.sender.clone(),
96-
_phantom: Default::default(),
97-
}
98-
}
99-
}
100-
10189
impl<K, V, T, D> Compactor<K, V, T, D>
10290
where
10391
K: Debug + Codec,
10492
V: Debug + Codec,
10593
T: Timestamp + Lattice + Codec64,
106-
D: Semigroup + Codec64 + Send + Sync,
94+
D: Semigroup + Codec64 + Send,
10795
{
10896
pub fn new(
10997
cfg: PersistConfig,
11098
metrics: Arc<Metrics>,
11199
cpu_heavy_runtime: Arc<CpuHeavyRuntime>,
112100
writer_id: WriterId,
113-
gc: GarbageCollector<K, V, T, D>,
114101
) -> Self {
115102
let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
116103
Instant,
@@ -165,7 +152,6 @@ where
165152
let compact_span =
166153
debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
167154
compact_span.follows_from(&Span::current());
168-
let gc = gc.clone();
169155
let _ = mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
170156
let res = Self::compact_and_apply(
171157
cfg,
@@ -175,7 +161,6 @@ where
175161
req,
176162
writer_id,
177163
&mut machine,
178-
&gc,
179164
)
180165
.instrument(compact_span)
181166
.await;
@@ -253,7 +238,6 @@ where
253238
req: CompactReq<T>,
254239
writer_id: WriterId,
255240
machine: &mut Machine<K, V, T, D>,
256-
gc: &GarbageCollector<K, V, T, D>,
257241
) -> Result<ApplyMergeResult, anyhow::Error> {
258242
metrics.compaction.started.inc();
259243
let start = Instant::now();
@@ -317,8 +301,7 @@ where
317301
match res {
318302
Ok(Ok(res)) => {
319303
let res = FueledMergeRes { output: res.output };
320-
let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
321-
maintenance.start_performing(machine, gc);
304+
let apply_merge_result = machine.merge_res(&res).await;
322305
match &apply_merge_result {
323306
ApplyMergeResult::AppliedExact => {
324307
metrics.compaction.applied.inc();

src/persist-client/src/internal/gc.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
use std::collections::HashSet;
1111
use std::fmt::Debug;
1212
use std::marker::PhantomData;
13-
use std::mem;
1413
use std::time::Instant;
1514

1615
use differential_dataflow::difference::Semigroup;
@@ -24,19 +23,19 @@ use tokio::sync::{mpsc, oneshot};
2423
use tracing::{debug, debug_span, warn, Instrument, Span};
2524

2625
use crate::internal::machine::{retry_external, Machine};
27-
use crate::internal::maintenance::RoutineMaintenance;
2826
use crate::internal::paths::{PartialRollupKey, RollupId};
2927
use crate::ShardId;
3028

31-
#[derive(Debug, Clone, PartialEq)]
29+
#[derive(Debug, Clone)]
30+
#[cfg_attr(test, derive(PartialEq))]
3231
pub struct GcReq {
3332
pub shard_id: ShardId,
3433
pub new_seqno_since: SeqNo,
3534
}
3635

3736
#[derive(Debug)]
3837
pub struct GarbageCollector<K, V, T, D> {
39-
sender: UnboundedSender<(GcReq, oneshot::Sender<RoutineMaintenance>)>,
38+
sender: UnboundedSender<(GcReq, oneshot::Sender<()>)>,
4039
_phantom: PhantomData<fn() -> (K, V, T, D)>,
4140
}
4241

@@ -106,7 +105,7 @@ where
106105
{
107106
pub fn new(mut machine: Machine<K, V, T, D>) -> Self {
108107
let (gc_req_sender, mut gc_req_recv) =
109-
mpsc::unbounded_channel::<(GcReq, oneshot::Sender<RoutineMaintenance>)>();
108+
mpsc::unbounded_channel::<(GcReq, oneshot::Sender<()>)>();
110109

111110
// spin off a single task responsible for executing GC requests.
112111
// work is enqueued into the task through a channel
@@ -142,7 +141,7 @@ where
142141

143142
let start = Instant::now();
144143
machine.metrics.gc.started.inc();
145-
let mut maintenance = Self::gc_and_truncate(&mut machine, consolidated_req)
144+
Self::gc_and_truncate(&mut machine, consolidated_req)
146145
.instrument(gc_span)
147146
.await;
148147
machine.metrics.gc.finished.inc();
@@ -156,9 +155,8 @@ where
156155
// inform all callers who enqueued GC reqs that their work is complete
157156
for sender in gc_completed_senders {
158157
// we can safely ignore errors here, it's possible the caller
159-
// wasn't interested in waiting and dropped their receiver.
160-
// maintenance will be somewhat-arbitrarily assigned to the first oneshot.
161-
let _ = sender.send(mem::take(&mut maintenance));
158+
// wasn't interested in waiting and dropped their receiver
159+
let _ = sender.send(());
162160
}
163161
}
164162
});
@@ -172,10 +170,7 @@ where
172170
/// Enqueues a [GcReq] to be consumed by the GC background task when available.
173171
///
174172
/// Returns a future that indicates when GC has cleaned up to at least [GcReq::new_seqno_since]
175-
pub fn gc_and_truncate_background(
176-
&self,
177-
req: GcReq,
178-
) -> Option<oneshot::Receiver<RoutineMaintenance>> {
173+
pub fn gc_and_truncate_background(&self, req: GcReq) -> Option<oneshot::Receiver<()>> {
179174
let (gc_completed_sender, gc_completed_receiver) = oneshot::channel();
180175
let new_gc_sender = self.sender.clone();
181176
let send = new_gc_sender.send((req, gc_completed_sender));
@@ -193,10 +188,7 @@ where
193188
Some(gc_completed_receiver)
194189
}
195190

196-
pub async fn gc_and_truncate(
197-
machine: &mut Machine<K, V, T, D>,
198-
req: GcReq,
199-
) -> RoutineMaintenance {
191+
pub async fn gc_and_truncate(machine: &mut Machine<K, V, T, D>, req: GcReq) {
200192
assert_eq!(req.shard_id, machine.shard_id());
201193
// NB: Because these requests can be processed concurrently (and in
202194
// arbitrary order), all of the logic below has to work even if we've
@@ -232,7 +224,7 @@ where
232224
"gc {} early returning, already GC'd past {}",
233225
req.shard_id, req.new_seqno_since,
234226
);
235-
return RoutineMaintenance::default();
227+
return;
236228
}
237229

238230
let mut deleteable_batch_blobs = HashSet::new();
@@ -312,7 +304,7 @@ where
312304
.state_versions
313305
.write_rollup_blob(&machine.shard_metrics, &state, &rollup_key)
314306
.await;
315-
let (applied, maintenance) = machine
307+
let applied = machine
316308
.add_and_remove_rollups((rollup_seqno, &rollup_key), &deleteable_rollup_blobs)
317309
.await;
318310
// We raced with some other GC process to write this rollup out. Ours
@@ -359,7 +351,5 @@ where
359351
"gc {} truncated diffs through seqno {}",
360352
req.shard_id, req.new_seqno_since
361353
);
362-
363-
maintenance
364354
}
365355
}

0 commit comments

Comments
 (0)