Skip to content

Commit cdcd5b0

Browse files
committed
_BLOB_ALLOC_PERF_MODE
1 parent 8d15a12 commit cdcd5b0

4 files changed

Lines changed: 28 additions & 8 deletions

File tree

prime_backup/action/helpers/blob_allocator.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from typing_extensions import override
1212

1313
from prime_backup.action.helpers.blob_creator_chunked import ChunkedBlobCreator
14-
from prime_backup.action.helpers.blob_creator_common import BlobCreateContext, BlobFileChanged, VolatileBlobFile, LookupBlobBySizeRequest, LookupBlobByHashRequest, BlobLookupRequest, BlobLookupRoutine
14+
from prime_backup.action.helpers.blob_creator_common import BlobCreateContext, BlobFileChanged, VolatileBlobFile, LookupBlobBySizeRequest, LookupBlobByHashRequest, BlobLookupRequest, BlobLookupRoutine, _BLOB_ALLOC_PERF_MODE
1515
from prime_backup.action.helpers.blob_creator_direct import DirectBlobCreator
1616
from prime_backup.action.helpers.blob_pre_calc_result import BlobPrecalculateResult
1717
from prime_backup.action.helpers.blob_recorder import BlobRecorder
@@ -271,6 +271,9 @@ def init_blob_store(self):
271271
self.__ctx.blob_store_in_cow_fs = file_utils.does_fs_support_cow(bs_path)
272272

273273
def schedule_loop(self, gen_list: List[BlobLookupRoutine[schema.File]]) -> List[schema.File]:
274+
if _BLOB_ALLOC_PERF_MODE:
275+
return self.__schedule_loop_simple(gen_list)
276+
274277
files: List[schema.File] = []
275278

276279
schedule_queue: Deque[BlobLookupRoutine[schema.File]] = collections.deque()
@@ -308,5 +311,19 @@ def callback(g: BlobLookupRoutine[schema.File] = scheduled):
308311

309312
return files
310313

314+
def __schedule_loop_simple(self, gen_list: List[BlobLookupRoutine[schema.File]]) -> List[schema.File]:
315+
files: List[schema.File] = []
316+
317+
def gen_wrapper(g: BlobLookupRoutine[schema.File]):
318+
f = yield from g
319+
files.append(f)
320+
321+
for gen in gen_list:
322+
for query_req in gen_wrapper(gen):
323+
self.__batch_lookup_manager.query(query_req, lambda: None)
324+
self.__batch_lookup_manager.flush()
325+
return files
326+
327+
311328
def add_existing_sizes(self, existing_sizes: Dict[int, bool]):
312329
self.__blob_by_size_cache.update(existing_sizes)

prime_backup/action/helpers/blob_creator_chunked.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import annotations
2-
31
import contextlib
42
import dataclasses
53
import enum
@@ -9,7 +7,7 @@
97
from pathlib import Path
108
from typing import BinaryIO, Dict, List, Optional
119

12-
from prime_backup.action.helpers.blob_creator_common import BlobLookupRoutine, BlobCreateContext, BlobCreatorBase
10+
from prime_backup.action.helpers.blob_creator_common import BlobLookupRoutine, BlobCreateContext, BlobCreatorBase, _BLOB_ALLOC_PERF_MODE
1311
from prime_backup.action.helpers.blob_pre_calc_result import BlobPrecalculateResult
1412
from prime_backup.action.helpers.chunk_grouper import ChunkGrouper
1513
from prime_backup.action.helpers.create_backup_utils import CreateBackupTimeCostKey, SourceFileNotFoundWrapper
@@ -251,7 +249,12 @@ def write_task(db_chunk_: schema.Chunk = db_chunk, compressor_: Compressor = com
251249
compressed = compressor_.compress_bytes(chunk_buf_)
252250
return _CompressedChunk(db_chunk_, compressed)
253251

254-
write_state.futures.append(pool.submit(write_task))
252+
if _BLOB_ALLOC_PERF_MODE:
253+
fut: Future[_CompressedChunk] = Future()
254+
fut.set_result(write_task())
255+
write_state.futures.append(fut)
256+
else:
257+
write_state.futures.append(pool.submit(write_task))
255258
write_state.pending_bytes += db_chunk.raw_size
256259

257260
self.__flush_compressed_chunk_futures(write_state, force=False)

prime_backup/action/helpers/blob_creator_common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
if TYPE_CHECKING:
2525
from prime_backup.action.helpers.pack_writer import PackWriter
2626

27+
_BLOB_ALLOC_PERF_MODE = False
2728
_T = TypeVar('_T')
2829

2930

prime_backup/action/helpers/blob_creator_direct.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ def __create_blob_artifact(self, plan: _DirectBlobPlan) -> BlobLookupRoutine[_Di
167167
if plan.policy == _DirectBlobCreatePolicy.hash_once:
168168
return self.__create_by_hash_once(compressor, plan)
169169
if plan.policy in (_DirectBlobCreatePolicy.read_all, _DirectBlobCreatePolicy.default):
170-
return self.__create_by_prehashed_content(compressor, plan)
170+
return self.__create_by_hashed_content(compressor, plan)
171171
raise AssertionError('bad policy {!r}'.format(plan.policy))
172172

173173
def __create_by_copy_hash(self, compressor: Compressor) -> BlobLookupRoutine[_DirectBlobCreateResult]:
@@ -212,8 +212,7 @@ def __create_by_hash_once(self, compressor: Compressor, plan: _DirectBlobPlan) -
212212
file_utils.copy_file_fast(temp_file_path, blob_path)
213213
return _DirectBlobCreateResult.created(blob_hash, cr.read_size, cr.write_size)
214214

215-
def __create_by_prehashed_content(self, compressor: Compressor, plan: _DirectBlobPlan) -> _DirectBlobCreateResult:
216-
misc_utils.assert_true(plan.blob_hash is not None, 'blob_hash is None')
215+
def __create_by_hashed_content(self, compressor: Compressor, plan: _DirectBlobPlan) -> _DirectBlobCreateResult:
217216
blob_hash = misc_utils.ensure_type(plan.blob_hash, str)
218217
blob_path = blob_utils.get_blob_path(blob_hash)
219218

0 commit comments

Comments
 (0)