Skip to content

Commit be2931b

Browse files
joshkang97facebook-github-bot
authored andcommitted
Split ingest external files into prepare and commit APIs (#14849)
Summary: This change splits ingestion into two public calls: `PrepareFileIngestion()` performs all of the off-mutex work and returns an opaque `FileIngestionHandle`, and `CommitFileIngestionHandle()` makes the prepared files visible under the mutex. Even though the "Prepare" phase was off the DB mutex, the application layer may still have application logic that depends on the completion of `IngestExternalFile`. `IngestExternalFiles(args)` is also just `PrepareFileIngestion(args)` followed by `CommitFileIngestionHandle()`, so its behavior is unchanged. `CommitFileIngestionHandles()` also supports committing multiple handles atomically. However, it is possible that a CF may be present in multiple handles. In this case, we merge the ingestion jobs together. This allows applications to prepare file ingestions at different times, but still provide a single atomic commit. Differential Revision: D108225105
1 parent 61c3e93 commit be2931b

16 files changed

Lines changed: 727 additions & 90 deletions

db/db_impl/db_impl.cc

Lines changed: 210 additions & 65 deletions
Large diffs are not rendered by default.

db/db_impl/db_impl.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ namespace ROCKSDB_NAMESPACE {
7676

7777
class Arena;
7878
class ArenaWrappedDBIter;
79+
class FileIngestionHandleImpl;
7980
class InMemoryStatsHistoryIterator;
8081
class MemTable;
8182
class PersistentStatsHistoryIterator;
@@ -604,6 +605,14 @@ class DBImpl : public DB {
604605
Status IngestExternalFiles(
605606
const std::vector<IngestExternalFileArg>& args) override;
606607

608+
using DB::PrepareFileIngestion;
609+
Status PrepareFileIngestion(
610+
const std::vector<IngestExternalFileArg>& args,
611+
std::unique_ptr<FileIngestionHandle>* handle) override;
612+
613+
Status CommitFileIngestionHandles(
614+
std::vector<std::unique_ptr<FileIngestionHandle>> handles) override;
615+
607616
using DB::CreateColumnFamilyWithImport;
608617
Status CreateColumnFamilyWithImport(
609618
const ColumnFamilyOptions& options, const std::string& column_family_name,
@@ -1916,6 +1925,7 @@ class DBImpl : public DB {
19161925
friend class WriteBatchWithIndex;
19171926
friend class WriteUnpreparedTxnDB;
19181927
friend class WriteUnpreparedTxn;
1928+
friend class FileIngestionHandleImpl;
19191929

19201930
friend class ForwardIterator;
19211931
friend struct SuperVersion;
@@ -2253,6 +2263,10 @@ class DBImpl : public DB {
22532263
void ReleaseFileNumberFromPendingOutputs(
22542264
std::unique_ptr<std::list<uint64_t>::iterator>& v);
22552265

2266+
// Rolls back one prepared file ingestion (delete its staged files, release
2267+
// the reserved file numbers)
2268+
void RollbackPreparedFileIngestion(FileIngestionHandleImpl* const h);
2269+
22562270
// Similar to pending_outputs, preserve OPTIONS file. Used for remote
22572271
// compaction.
22582272
std::list<uint64_t>::iterator CaptureOptionsFileNumber();
@@ -3472,6 +3486,10 @@ class DBImpl : public DB {
34723486
// REQUIRES: mutex held
34733487
int num_running_ingest_file_ = 0;
34743488

3489+
// Number of FileIngestionHandle objects produced by PrepareFileIngestion()
3490+
// that have not been committed or destroyed yet.
3491+
std::atomic<uint32_t> num_outstanding_prepared_ingestions_{0};
3492+
34753493
WalManager wal_manager_;
34763494

34773495
// A value of > 0 temporarily disables scheduling of background work

db/db_impl/db_impl_readonly.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,19 @@ class DBImplReadOnly : public DBImpl {
134134
return Status::NotSupported("Not supported operation in read only mode.");
135135
}
136136

137+
using DB::PrepareFileIngestion;
138+
Status PrepareFileIngestion(
139+
const std::vector<IngestExternalFileArg>& /*args*/,
140+
std::unique_ptr<FileIngestionHandle>* /*handle*/) override {
141+
return Status::NotSupported("Not supported operation in read only mode.");
142+
}
143+
144+
using DB::CommitFileIngestionHandles;
145+
Status CommitFileIngestionHandles(
146+
std::vector<std::unique_ptr<FileIngestionHandle>> /*handles*/) override {
147+
return Status::NotSupported("Not supported operation in read only mode.");
148+
}
149+
137150
using DB::CreateColumnFamilyWithImport;
138151
Status CreateColumnFamilyWithImport(
139152
const ColumnFamilyOptions& /*options*/,

db/db_impl/db_impl_secondary.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,19 @@ class DBImplSecondary : public DBImpl {
239239
return Status::NotSupported("Not supported operation in secondary mode.");
240240
}
241241

242+
using DB::PrepareFileIngestion;
243+
Status PrepareFileIngestion(
244+
const std::vector<IngestExternalFileArg>& /*args*/,
245+
std::unique_ptr<FileIngestionHandle>* /*handle*/) override {
246+
return Status::NotSupported("Not supported operation in secondary mode.");
247+
}
248+
249+
using DB::CommitFileIngestionHandles;
250+
Status CommitFileIngestionHandles(
251+
std::vector<std::unique_ptr<FileIngestionHandle>> /*handles*/) override {
252+
return Status::NotSupported("Not supported operation in secondary mode.");
253+
}
254+
242255
// Try to catch up with the primary by reading as much as possible from the
243256
// log files until there is nothing more to read or encounters an error. If
244257
// the amount of information in the log files to process is huge, this

db/db_test.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3516,6 +3516,18 @@ class ModelDB : public DB {
35163516
return Status::NotSupported("Not implemented");
35173517
}
35183518

3519+
using DB::PrepareFileIngestion;
3520+
Status PrepareFileIngestion(
3521+
const std::vector<IngestExternalFileArg>& /*args*/,
3522+
std::unique_ptr<FileIngestionHandle>* /*handle*/) override {
3523+
return Status::NotSupported("Not implemented.");
3524+
}
3525+
3526+
Status CommitFileIngestionHandles(
3527+
std::vector<std::unique_ptr<FileIngestionHandle>> /*handles*/) override {
3528+
return Status::NotSupported("Not implemented.");
3529+
}
3530+
35193531
using DB::CreateColumnFamilyWithImport;
35203532
Status CreateColumnFamilyWithImport(
35213533
const ColumnFamilyOptions& /*options*/,

db/external_sst_file_ingestion_job.cc

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,23 +76,10 @@ Status ExternalSstFileIngestionJob::Prepare(
7676
auto num_files = files_to_ingest_.size();
7777
if (num_files == 0) {
7878
return Status::InvalidArgument("The list of files is empty");
79-
} else if (num_files > 1) {
80-
// Verify that passed files don't have overlapping ranges
81-
autovector<const IngestedFileInfo*> sorted_files;
82-
for (size_t i = 0; i < num_files; i++) {
83-
sorted_files.push_back(&files_to_ingest_[i]);
84-
}
85-
86-
std::sort(sorted_files.begin(), sorted_files.end(), file_range_checker_);
87-
88-
for (size_t i = 0; i + 1 < num_files; i++) {
89-
if (file_range_checker_.Overlaps(*sorted_files[i], *sorted_files[i + 1],
90-
/* known_sorted= */ true)) {
91-
files_overlap_ = true;
92-
break;
93-
}
94-
}
9579
}
80+
// Detect whether the input files overlap one another; this drives how they
81+
// are divided into batches below.
82+
files_overlap_ = ComputeFilesOverlap(files_to_ingest_);
9683

9784
if (atomic_replace_range.has_value()) {
9885
atomic_replace_range_.emplace();
@@ -433,6 +420,60 @@ void ExternalSstFileIngestionJob::DivideInputFilesIntoBatches() {
433420
}
434421
}
435422

423+
bool ExternalSstFileIngestionJob::ComputeFilesOverlap(
424+
const autovector<IngestedFileInfo>& files) const {
425+
const size_t num_files = files.size();
426+
if (num_files <= 1) {
427+
return false;
428+
}
429+
// Verify whether the files have overlapping ranges by sorting copies of the
430+
// file ranges and checking adjacent pairs.
431+
autovector<const IngestedFileInfo*> sorted_files;
432+
for (size_t i = 0; i < num_files; i++) {
433+
sorted_files.push_back(&files[i]);
434+
}
435+
std::sort(sorted_files.begin(), sorted_files.end(), file_range_checker_);
436+
for (size_t i = 0; i + 1 < num_files; i++) {
437+
if (file_range_checker_.Overlaps(*sorted_files[i], *sorted_files[i + 1],
438+
/* known_sorted= */ true)) {
439+
return true;
440+
}
441+
}
442+
return false;
443+
}
444+
445+
Status ExternalSstFileIngestionJob::MergeForSameColumnFamily(
446+
ExternalSstFileIngestionJob* other) {
447+
assert(other != nullptr);
448+
assert(other != this);
449+
assert(cfd_ == other->cfd_);
450+
if (atomic_replace_range_.has_value() ||
451+
other->atomic_replace_range_.has_value()) {
452+
return Status::NotSupported(
453+
"cannot merge file ingestion handles for the same column family when "
454+
"atomic_replace_range is used");
455+
}
456+
if (!(ingestion_options_ == other->ingestion_options_)) {
457+
return Status::InvalidArgument(
458+
"file ingestion handles for the same column family must be prepared "
459+
"with the same IngestExternalFileOptions");
460+
}
461+
// Append the other job's prepared files after this job's so that, for any
462+
// overlapping keys, the other job's data wins via a higher assigned sequence
463+
// number -- the same semantics as passing all the files to a single ingestion
464+
// call in this order. Recompute overlap and rebuild the batches over the
465+
// union.
466+
for (IngestedFileInfo& file : other->files_to_ingest_) {
467+
files_to_ingest_.push_back(std::move(file));
468+
}
469+
other->files_to_ingest_.clear();
470+
other->file_batches_to_ingest_.clear();
471+
files_overlap_ = ComputeFilesOverlap(files_to_ingest_);
472+
file_batches_to_ingest_.clear();
473+
DivideInputFilesIntoBatches();
474+
return Status::OK();
475+
}
476+
436477
Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
437478
SuperVersion* super_version) {
438479
Status status;

db/external_sst_file_ingestion_job.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,14 @@ class ExternalSstFileIngestionJob {
301301
return max_assigned_seqno_;
302302
}
303303

304+
// Merge another already-Prepare()d job for the SAME column family into this
305+
// one so both sets of files are committed by a single Run(). The other job's
306+
// files are appended after this job's, so for any overlapping keys the other
307+
// job's data wins via a higher assigned sequence number -- the same semantics
308+
// as passing all the files to a single ingestion call in this order. The
309+
// other job is left empty.
310+
Status MergeForSameColumnFamily(ExternalSstFileIngestionJob* other);
311+
304312
private:
305313
Status ResetTableReader(const std::string& external_file,
306314
uint64_t new_file_number,
@@ -334,6 +342,10 @@ class ExternalSstFileIngestionJob {
334342
// make one batch.
335343
void DivideInputFilesIntoBatches();
336344

345+
// Returns whether any two files in `files` have overlapping key ranges, by
346+
// sorting the file ranges and checking adjacent pairs.
347+
bool ComputeFilesOverlap(const autovector<IngestedFileInfo>& files) const;
348+
337349
// Assign level for the files in one batch. The files within one batch are not
338350
// overlapping, and we assign level to each file one after another.
339351
// If `prev_batch_uppermost_level` is specified, all files in this batch will
@@ -407,7 +419,7 @@ class ExternalSstFileIngestionJob {
407419
SnapshotList* db_snapshots_;
408420
autovector<IngestedFileInfo> files_to_ingest_;
409421
std::vector<FileBatchInfo> file_batches_to_ingest_;
410-
const IngestExternalFileOptions& ingestion_options_;
422+
const IngestExternalFileOptions ingestion_options_;
411423
std::optional<KeyRangeInfo> atomic_replace_range_;
412424
Directories* directories_;
413425
EventLogger* event_logger_;

0 commit comments

Comments
 (0)