Skip to content

Commit 2d48fb6

Browse files
committed
skip chunk calc in pre calc for non-coarse-grained chunk method
to avoid performance degradation on chunking if concurrency > 1
1 parent 6fc942b commit 2d48fb6

4 files changed

Lines changed: 74 additions & 29 deletions

File tree

prime_backup/action/helpers/blob_creator_chunked.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import time
66
from concurrent.futures import Future
77
from pathlib import Path
8-
from typing import BinaryIO, Dict, List, Optional
8+
from typing import BinaryIO, Dict, List, Optional, Union
99

1010
from prime_backup.action.helpers.blob_creator_common import BlobLookupRoutine, BlobCreateContext, BlobCreatorBase, _BLOB_ALLOC_PERF_MODE
1111
from prime_backup.action.helpers.blob_pre_calc_result import BlobPrecalculateResult
@@ -88,14 +88,14 @@ def get_or_create(self) -> BlobLookupRoutine[schema.Blob]:
8888

8989
with contextlib.ExitStack() as es:
9090
actual_path_to_read = self.__prepare_path_to_read(es, plan.policy)
91-
snapshot = self.__load_or_cut_chunks(actual_path_to_read, plan.pre_cal_result, src_path_str)
91+
92+
snapshot_or_blob = yield from self.__load_or_cut_chunks(actual_path_to_read, plan.pre_cal_result, src_path_str)
93+
if isinstance(snapshot_or_blob, schema.Blob):
94+
return snapshot_or_blob
95+
snapshot: _ChunkedBlobSnapshot = snapshot_or_blob
9296
if snapshot.blob_size == 0:
9397
self.log_and_raise_blob_file_changed('Blob size becomes zero', self.args.last_chance)
9498

95-
if (cache := (yield from self.query_cached_blob(snapshot.blob_hash))) is not None:
96-
self.logger.debug('Chunked file {} (hash {}) already exists in DB'.format(src_path_str, snapshot.blob_hash))
97-
return cache
98-
9999
# notes: the following code cannot be interrupted (yield).
100100
# The blob is specifically generated by the generator
101101
# if any yield is done, ensure to check blob_by_hash_cache again
@@ -155,26 +155,42 @@ def __prepare_path_to_read(self, es: contextlib.ExitStack, policy: _ChunkedBlobC
155155
return temp_file_path
156156
raise AssertionError('bad policy {!r}'.format(policy))
157157

158-
def __load_or_cut_chunks(self, actual_path_to_read: Path, pre_cal_result: Optional[BlobPrecalculateResult], src_path_str: str) -> _ChunkedBlobSnapshot:
158+
def __load_or_cut_chunks(self, actual_path_to_read: Path, pre_cal_result: Optional[BlobPrecalculateResult], src_path_str: str) -> BlobLookupRoutine[Union[_ChunkedBlobSnapshot, schema.Blob]]:
159+
pre_calc_blob_hash: Optional[str] = None
159160
if pre_cal_result is not None:
160-
chunks = pre_cal_result.chunks
161-
blob_hash = pre_cal_result.hash
162-
blob_size = pre_cal_result.size
163-
self.logger.debug('Cut and hashed file {} with size {} into {} chunks using {} (precalc)'.format(
164-
src_path_str, ByteCount(blob_size).auto_str(), len(chunks), self.args.chunk_method.name,
165-
))
166-
return _ChunkedBlobSnapshot(chunks, blob_hash, blob_size)
161+
pre_calc_blob_hash = pre_cal_result.hash
162+
if pre_cal_result.chunks is not None:
163+
chunks = pre_cal_result.chunks
164+
blob_size = pre_cal_result.size
165+
self.logger.debug('Cut and hashed file {} with size {} into {} chunks using {} (precalc)'.format(
166+
src_path_str, ByteCount(blob_size).auto_str(), len(chunks), self.args.chunk_method.name,
167+
))
168+
if (cache := (yield from self.query_cached_blob(pre_calc_blob_hash))) is not None:
169+
self.logger.debug('Chunked file {} (hash {}) already exists in DB'.format(src_path_str, pre_calc_blob_hash))
170+
return cache
171+
return _ChunkedBlobSnapshot(chunks, pre_calc_blob_hash, blob_size)
172+
173+
if pre_calc_blob_hash is not None:
174+
if (cache := (yield from self.query_cached_blob(pre_calc_blob_hash))) is not None:
175+
self.logger.debug('Chunked file {} (hash {}) already exists in DB'.format(src_path_str, pre_calc_blob_hash))
176+
return cache
167177

168178
previous_chunks = self.ctx.file_lookup.get_previous_chunks(self.args.src_path) if self.args.chunk_method.needs_previous_chunks() else None
169179
chunker = self.args.chunk_method.create_file_chunker(actual_path_to_read, need_entire_file_hash=True, previous_chunks=previous_chunks)
170180
with self.ctx.time_costs.measure_time_cost(CreateBackupTimeCostKey.kind_io_read) as chunking_cost:
171181
chunks = chunker.cut_all()
172182
blob_hash = chunker.get_entire_file_hash()
173183
blob_size = chunker.get_read_file_size()
184+
if pre_calc_blob_hash is not None and pre_calc_blob_hash != blob_hash:
185+
self.log_and_raise_blob_file_changed('Blob hash mismatch, pre calc {}, chunked {}'.format(pre_calc_blob_hash, blob_hash), self.args.last_chance)
186+
174187
self.logger.debug('Cut and hashed file {} with size {} into {} chunks using {} in {:.2f}s ({}/s)'.format(
175188
src_path_str, ByteCount(blob_size).auto_str(), len(chunks), self.args.chunk_method.name, chunking_cost(),
176189
ByteCount(blob_size / chunking_cost() if chunking_cost() > 0 else 0).auto_str(),
177190
))
191+
if pre_calc_blob_hash is None and (cache := (yield from self.query_cached_blob(blob_hash))) is not None:
192+
self.logger.debug('Chunked file {} (hash {}) already exists in DB'.format(src_path_str, blob_hash))
193+
return cache
178194
return _ChunkedBlobSnapshot(chunks, blob_hash, blob_size)
179195

180196
def __create_missing_chunks(self, actual_path_to_read: Path, snapshot: _ChunkedBlobSnapshot, known_db_chunks: Dict[str, Optional[schema.Chunk]]) -> _ChunkWriteResult:

prime_backup/action/helpers/blob_pre_calc_result.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import dataclasses
2+
import enum
23
from pathlib import Path
34
from typing import List, IO, Optional, Iterable
45

@@ -8,6 +9,21 @@
89
from prime_backup.utils.hash_utils import SizeAndHash
910

1011

12+
class CalcChunkPolicy(enum.Enum):
13+
AUTO = enum.auto()
14+
FALSE = enum.auto()
15+
TRUE = enum.auto()
16+
17+
def should_calculate_chunks(self, chunk_method: ChunkMethod) -> bool:
18+
if self == CalcChunkPolicy.AUTO:
19+
return chunk_method.should_precalculate_chunks()
20+
if self == CalcChunkPolicy.FALSE:
21+
return False
22+
if self == CalcChunkPolicy.TRUE:
23+
return True
24+
raise ValueError('unknown calc chunk policy {!r}'.format(self))
25+
26+
1127
@dataclasses.dataclass(frozen=True)
1228
class BlobPrecalculateResult:
1329
class SizeMismatched(Exception):
@@ -16,24 +32,27 @@ class SizeMismatched(Exception):
1632
size: int
1733
hash: str
1834
should_be_chunked: bool
19-
chunks: List[PrettyChunk]
35+
chunks: Optional[List[PrettyChunk]]
2036

2137
def simple_repr(self) -> str:
2238
return misc_utils.represent(self, attrs={
2339
'size': self.size,
2440
'hash': self.hash,
2541
'should_be_chunked': self.should_be_chunked,
26-
'chunks_len': len(self.chunks),
42+
'chunks_len': len(self.chunks) if self.chunks is not None else None,
2743
})
2844

2945
@classmethod
30-
def from_stream(cls, stream: IO[bytes], rel_path: Path, size: int) -> 'BlobPrecalculateResult':
46+
def from_stream(cls, stream: IO[bytes], rel_path: Path, size: int, *, calc_chunk_policy: CalcChunkPolicy = CalcChunkPolicy.AUTO) -> 'BlobPrecalculateResult':
3147
chunk_method = ChunkMethod.get_for_file(rel_path, size)
32-
chunks: List[PrettyChunk] = []
48+
chunks: Optional[List[PrettyChunk]] = None
3349
if chunk_method is not None:
34-
chunker = chunk_method.create_stream_chunker(stream, need_entire_file_hash=True)
35-
chunks = chunker.cut_all()
36-
sah = SizeAndHash(chunker.get_read_file_size(), chunker.get_entire_file_hash())
50+
if calc_chunk_policy.should_calculate_chunks(chunk_method):
51+
chunker = chunk_method.create_stream_chunker(stream, need_entire_file_hash=True)
52+
chunks = chunker.cut_all()
53+
sah = SizeAndHash(chunker.get_read_file_size(), chunker.get_entire_file_hash())
54+
else:
55+
sah = hash_utils.calc_reader_size_and_hash(stream)
3756
else:
3857
sah = hash_utils.calc_reader_size_and_hash(stream)
3958
if sah.size != size:
@@ -47,13 +66,16 @@ def from_stream(cls, stream: IO[bytes], rel_path: Path, size: int) -> 'BlobPreca
4766
)
4867

4968
@classmethod
50-
def from_file(cls, path: Path, rel_path: Path, size: int, *, previous_chunks: Optional[Iterable[PrettyChunk]] = None) -> 'BlobPrecalculateResult':
69+
def from_file(cls, path: Path, rel_path: Path, size: int, *, calc_chunk_policy: CalcChunkPolicy = CalcChunkPolicy.AUTO, previous_chunks: Optional[Iterable[PrettyChunk]] = None) -> 'BlobPrecalculateResult':
5170
chunk_method = ChunkMethod.get_for_file(rel_path, size)
52-
chunks: List[PrettyChunk] = []
71+
chunks: Optional[List[PrettyChunk]] = None
5372
if chunk_method is not None:
54-
chunker = chunk_method.create_file_chunker(path, need_entire_file_hash=True, previous_chunks=previous_chunks)
55-
chunks = chunker.cut_all()
56-
sah = SizeAndHash(chunker.get_read_file_size(), chunker.get_entire_file_hash())
73+
if calc_chunk_policy.should_calculate_chunks(chunk_method):
74+
chunker = chunk_method.create_file_chunker(path, need_entire_file_hash=True, previous_chunks=previous_chunks)
75+
chunks = chunker.cut_all()
76+
sah = SizeAndHash(chunker.get_read_file_size(), chunker.get_entire_file_hash())
77+
else:
78+
sah = hash_utils.calc_file_size_and_hash(path)
5779
else:
5880
sah = hash_utils.calc_file_size_and_hash(path)
5981
if sah.size != size:

prime_backup/action/import_backup_action.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from prime_backup.action import Action
88
from prime_backup.action.helpers.backup_finalizer import BackupFinalizer
9-
from prime_backup.action.helpers.blob_pre_calc_result import BlobPrecalculateResult
9+
from prime_backup.action.helpers.blob_pre_calc_result import BlobPrecalculateResult, CalcChunkPolicy
1010
from prime_backup.action.helpers.blob_recorder import BlobRecorder
1111
from prime_backup.action.helpers.chunk_grouper import ChunkGrouper
1212
from prime_backup.action.helpers.pack_writer import PackWriter
@@ -251,14 +251,14 @@ def __import_packed_backup_file(self, session: DbSession, file_holder: PackedBac
251251
for i, member in enumerate(members):
252252
if member.is_file():
253253
with member.open() as f:
254-
pre_cal_dict[i] = BlobPrecalculateResult.from_stream(f, Path(member.path), member.size)
254+
pre_cal_dict[i] = BlobPrecalculateResult.from_stream(f, Path(member.path), member.size, calc_chunk_policy=CalcChunkPolicy.TRUE)
255255

256256
for h, blob in session.get_blobs_by_hashes_opt([res.hash for res in pre_cal_dict.values()]).items():
257257
if blob is not None:
258258
self.__blob_cache[h] = blob
259259

260260
for h, chunk in session.get_chunks_by_hashes_opt(collection_utils.deduplicated_list(
261-
c.hash for res in pre_cal_dict.values() for c in res.chunks
261+
c.hash for res in pre_cal_dict.values() for c in (res.chunks or [])
262262
)).items():
263263
if chunk is not None:
264264
self.__chunk_cache[h] = chunk

prime_backup/types/chunk_method.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,10 @@ def create_stream_chunker(self, stream: IO[bytes], need_entire_file_hash: bool)
5252

5353
def needs_previous_chunks(self) -> bool:
5454
return self.value.needs_previous_chunks()
55+
56+
def should_precalculate_chunks(self) -> bool:
57+
"""
58+
Return True if this method is coarse-grained enough for parallel precalculation.
59+
Finer chunks can reduce throughput when multiple files are chunked concurrently.
60+
"""
61+
return self in (ChunkMethod.fastcdc_1m, ChunkMethod.fixed_1m)

0 commit comments

Comments
 (0)