Skip to content

Commit fc6baae

Browse files
committed
chunk migrate compress method
1 parent 81292b8 commit fc6baae

5 files changed

Lines changed: 188 additions & 41 deletions

File tree

TODO.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ CDC chunking
1919
- [ ] Performance optimization
2020
- [ ] Fuse support
2121
- [ ] TODO cleanup
22-
- [ ] Compress migration
22+
- [x] Compress migration
2323
- [ ] Document
2424

2525
## QOL

prime_backup/action/__init__.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
import logging
55
import threading
66
from abc import ABC, abstractmethod
7-
from typing import TypeVar, Generic
7+
from typing import TypeVar, Generic, Optional
88

99
_T = TypeVar('_T')
10+
_S = TypeVar('_S')
1011

1112

1213
class Action(Generic[_T], ABC):
@@ -18,6 +19,8 @@ def __init__(self):
1819
self.logger: logging.Logger = logger.get()
1920
self.config: Config = Config.get()
2021

22+
self.__running_action: Optional[Action] = None
23+
2124
@abstractmethod
2225
def run(self) -> _T:
2326
...
@@ -27,5 +30,16 @@ def is_interruptable(self) -> bool:
2730

2831
def interrupt(self):
2932
self.is_interrupted.set()
30-
31-
# TODO: run_sub_action for interrupt chain
33+
if (action := self.__running_action) is not None:
34+
action.interrupt()
35+
36+
def run_action(self, action: 'Action[_S]', auto_interrupt: bool = True) -> _S:
37+
if self.__running_action is not None:
38+
raise RuntimeError('Cannot run action twice at the same time, current: {}, new: {}'.format(self.__running_action, action))
39+
self.__running_action = action
40+
try:
41+
if auto_interrupt and self.is_interrupted.is_set():
42+
action.interrupt()
43+
return action.run()
44+
finally:
45+
self.__running_action = None

prime_backup/action/migrate_compress_method_action.py

Lines changed: 141 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import shutil
22
import time
3+
from concurrent.futures import Future
34
from pathlib import Path
4-
from typing import List, Tuple, Set
5+
from typing import List, Tuple, Set, Dict
56

67
from typing_extensions import override
78

@@ -10,8 +11,10 @@
1011
from prime_backup.db import schema
1112
from prime_backup.db.access import DbAccess
1213
from prime_backup.db.session import DbSession
14+
from prime_backup.db.values import BlobStorageMethod
1315
from prime_backup.types.size_diff import SizeDiff
14-
from prime_backup.utils import blob_utils
16+
from prime_backup.utils import blob_utils, chunk_utils
17+
from prime_backup.utils.thread_pool import FailFastBlockingThreadPool
1518

1619
_OLD_BLOB_SUFFIX = '_old'
1720

@@ -20,16 +23,35 @@ class MigrateCompressMethodAction(Action[SizeDiff]):
2023
def __init__(self, new_compress_method: CompressMethod):
2124
super().__init__()
2225
self.new_compress_method = new_compress_method
26+
27+
# records changed files
2328
self.__migrated_blob_hashes: List[str] = []
29+
self.__migrated_chunk_hashes: List[str] = []
30+
31+
# records affects stuffs for next migration phase
32+
self.__affected_chunk_groups_ids: Set[int] = set()
2433
self.__affected_fileset_ids: Set[int] = set()
2534

35+
def __update_files_for_blob_change(self, session: DbSession, changed_blobs_by_hash: Dict[str, schema.Blob]):
36+
for file in session.get_file_by_blob_hashes(list(changed_blobs_by_hash.keys())):
37+
blob = changed_blobs_by_hash[file.blob_hash]
38+
file.blob_compress = blob.compress
39+
file.blob_stored_size = blob.stored_size
40+
self.__affected_fileset_ids.add(file.fileset_id)
41+
2642
@classmethod
2743
def __get_blob_paths(cls, h: str) -> Tuple[Path, Path]:
2844
blob_path = blob_utils.get_blob_path(h)
2945
old_trash_path = blob_path.parent / (blob_path.name + _OLD_BLOB_SUFFIX)
3046
return blob_path, old_trash_path
3147

32-
def __migrate_blob(self, blob: schema.Blob) -> bool:
48+
@classmethod
49+
def __get_chunk_paths(cls, h: str) -> Tuple[Path, Path]:
50+
chunk_path = chunk_utils.get_chunk_path(h)
51+
old_trash_path = chunk_path.parent / (chunk_path.name + _OLD_BLOB_SUFFIX)
52+
return chunk_path, old_trash_path
53+
54+
def __migrate_single_direct_blob(self, blob: schema.Blob) -> bool:
3355
new_compress_method = self.config.backup.get_compress_method_from_size(blob.raw_size, compress_method_override=self.new_compress_method)
3456
decompressor = Compressor.create(blob.compress)
3557
compressor = Compressor.create(new_compress_method)
@@ -46,26 +68,86 @@ def __migrate_blob(self, blob: schema.Blob) -> bool:
4668
blob.stored_size = writer.get_write_len()
4769
return True
4870

49-
def __migrate_blobs_and_sync_files(self, session: DbSession, blobs: List[schema.Blob]):
50-
blob_mapping = {}
51-
for blob in blobs:
52-
try:
53-
changed = self.__migrate_blob(blob)
54-
except Exception as e:
55-
self.logger.error('Migrate blob {} failed: {}'.format(blob, e))
56-
raise
57-
58-
if changed:
59-
blob_mapping[blob.hash] = blob
60-
self.__migrated_blob_hashes.append(blob.hash)
61-
62-
for file in session.get_file_by_blob_hashes(list(blob_mapping.keys())):
63-
blob = blob_mapping[file.blob_hash]
64-
file.blob_compress = blob.compress
65-
file.blob_stored_size = blob.stored_size
66-
self.__affected_fileset_ids.add(file.fileset_id)
71+
def __migrate_direct_blobs_and_sync_files(self, session: DbSession, blobs: List[schema.Blob]):
72+
changed_blobs_by_hash: Dict[str, schema.Blob] = {}
73+
74+
with FailFastBlockingThreadPool('migration') as pool:
75+
future_pairs: List[Tuple[schema.Blob, 'Future[bool]']] = []
76+
for blob in blobs:
77+
future = pool.submit(self.__migrate_single_direct_blob, blob)
78+
future_pairs.append((blob, future))
79+
for blob, future in future_pairs:
80+
try:
81+
changed = future.result()
82+
except Exception as e:
83+
self.logger.error('Migrate blob {} failed: {}'.format(blob, e))
84+
raise
85+
if changed:
86+
changed_blobs_by_hash[blob.hash] = blob
87+
self.__migrated_blob_hashes.append(blob.hash)
88+
89+
self.__update_files_for_blob_change(session, changed_blobs_by_hash)
90+
91+
def __migrate_single_chunk(self, chunk: schema.Chunk) -> bool:
92+
new_compress_method = self.config.backup.get_compress_method_from_size(chunk.raw_size, compress_method_override=self.new_compress_method)
93+
decompressor = Compressor.create(chunk.compress)
94+
compressor = Compressor.create(new_compress_method)
95+
if decompressor.get_method() == compressor.get_method():
96+
return False
97+
98+
chunk_path, old_trash_path = self.__get_chunk_paths(chunk.hash)
99+
chunk_path.replace(old_trash_path)
100+
with decompressor.open_decompressed(old_trash_path) as f_src:
101+
with compressor.open_compressed_bypassed(chunk_path) as (writer, f_dst):
102+
shutil.copyfileobj(f_src, f_dst)
103+
104+
chunk.compress = new_compress_method.name
105+
chunk.stored_size = writer.get_write_len()
106+
return True
107+
108+
def __migrate_chunks(self, session: DbSession, chunks: List[schema.Chunk]):
109+
changed_chunk_ids: Set[int] = set()
110+
111+
with FailFastBlockingThreadPool('migration') as pool:
112+
future_pairs: List[Tuple[schema.Chunk, 'Future[bool]']] = []
113+
for chunk in chunks:
114+
future = pool.submit(self.__migrate_single_chunk, chunk)
115+
future_pairs.append((chunk, future))
116+
for chunk, future in future_pairs:
117+
try:
118+
changed = future.result()
119+
except Exception as e:
120+
self.logger.error('Migrate chunk {} failed: {}'.format(chunk, e))
121+
raise
122+
if changed:
123+
changed_chunk_ids.add(chunk.id)
124+
self.__migrated_chunk_hashes.append(chunk.hash)
125+
126+
self.__affected_chunk_groups_ids.update(session.get_chunk_group_ids_by_chunk_ids(list(changed_chunk_ids)))
127+
128+
def __update_chunk_group_and_blobs_for_chunk_changes(self, session: DbSession):
129+
if len(self.__affected_chunk_groups_ids) == 0:
130+
return
131+
132+
chunk_group_id_list = list(self.__affected_chunk_groups_ids)
133+
chunk_groups = session.get_chunk_groups_by_ids(chunk_group_id_list)
134+
for chunk_group in chunk_groups.values():
135+
chunk_group.chunk_stored_size_sum = session.calc_chunk_group_stored_size_sum(chunk_group.id)
136+
137+
affected_blobs = session.get_blobs_by_chunk_group_ids(chunk_group_id_list)
138+
for blob in affected_blobs:
139+
if blob.storage_method != BlobStorageMethod.chunked.value:
140+
raise AssertionError('Blob {!r} is not a chunked blob'.format(blob))
141+
blob.stored_size = session.calc_chunked_blob_stored_size_sum(blob.id)
142+
self.__update_files_for_blob_change(session, {blob.hash: blob for blob in affected_blobs})
143+
144+
self.logger.info('Syncing {} affected chunk groups and {} affected blobs for chunk changes'.format(len(chunk_groups), len(affected_blobs)))
145+
session.flush_and_expunge_all()
67146

68147
def __update_fileset_and_backups(self, session: DbSession):
148+
if len(self.__affected_fileset_ids) == 0:
149+
return
150+
69151
fileset_ids = set(self.__affected_fileset_ids)
70152
backup_ids = session.get_backup_ids_by_fileset_ids(list(fileset_ids))
71153
self.logger.info('Syncing {} affected filesets and {} associated backups'.format(len(fileset_ids), len(backup_ids)))
@@ -86,24 +168,34 @@ def __update_fileset_and_backups(self, session: DbSession):
86168
fs_base = filesets[backup.fileset_id_base]
87169
fs_delta = filesets[backup.fileset_id_delta]
88170
backup.file_stored_size_sum = fs_base.file_stored_size_sum + fs_delta.file_stored_size_sum
171+
session.flush_and_expunge_all()
89172

90-
def __erase_old_blobs(self):
173+
def __erase_old_blob_and_chunk_files(self):
91174
for h in self.__migrated_blob_hashes:
92175
_, old_trash_path = self.__get_blob_paths(h)
93176
old_trash_path.unlink()
177+
for h in self.__migrated_chunk_hashes:
178+
_, old_trash_path = self.__get_chunk_paths(h)
179+
old_trash_path.unlink()
94180

95181
def __rollback(self):
96182
for h in self.__migrated_blob_hashes:
97183
blob_path, old_trash_path = self.__get_blob_paths(h)
98184
if old_trash_path.is_file():
99185
old_trash_path.replace(blob_path)
186+
for h in self.__migrated_chunk_hashes:
187+
chunk_path, old_trash_path = self.__get_chunk_paths(h)
188+
if old_trash_path.is_file():
189+
old_trash_path.replace(chunk_path)
100190

101191
@override
102192
def run(self) -> SizeDiff:
103193
# Notes: requires 2x disk usage of the blob store, stores all blob hashes in memory
104-
self.__migrated_blob_hashes.clear()
105194
self.logger.info('Migrating compress method to {} (compress threshold = {})'.format(self.new_compress_method.name, self.config.backup.compress_threshold))
106195

196+
def get_actual_blob_store_stored_size_sum() -> int:
197+
return session.get_direct_blob_stored_size_sum() + session.get_chunk_stored_size_sum()
198+
107199
try:
108200
# Blob operation steps:
109201
# 1. move xxx -> xxx_old
@@ -112,28 +204,43 @@ def run(self) -> SizeDiff:
112204
with DbAccess.open_session() as session:
113205
# 0. fetch information before the migration
114206
t = time.time()
115-
before_size = session.get_blob_stored_size_sum()
116-
total_blob_count = session.get_blob_count()
207+
before_size = get_actual_blob_store_stored_size_sum()
117208

118-
# 1. migrate blob objects
209+
# 1. migrate direct blob objects
119210
cnt = 0
211+
total_blob_count = session.get_blob_count()
120212
for blobs in session.iterate_blob_batch(batch_size=1000):
121213
cnt += len(blobs)
122214
self.logger.info('Processing blobs {} / {}'.format(cnt, total_blob_count))
123-
self.__migrate_blobs_and_sync_files(session, blobs)
215+
self.__migrate_direct_blobs_and_sync_files(session, [
216+
blob for blob in blobs
217+
if blob.storage_method == BlobStorageMethod.direct.value
218+
])
124219
session.flush_and_expunge_all()
125-
126220
if len(self.__migrated_blob_hashes) == 0:
127221
self.logger.info('No blob needs a compress method change, nothing to migrate')
128222
else:
129-
self.logger.info('Migrated {} blobs and related files'.format(len(self.__migrated_blob_hashes)))
223+
self.logger.info('Migrated {} direct blobs and related files'.format(len(self.__migrated_blob_hashes)))
130224

131-
# 3. migrate backup data
132-
self.__update_fileset_and_backups(session)
225+
# 2. migrate chunks objects, and sync chunk groups and chunked blobs
226+
cnt = 0
227+
total_chunk_count = session.get_chunk_count()
228+
for chunks in session.iterate_chunk_batch(batch_size=1000):
229+
cnt += len(chunks)
230+
self.logger.info('Processing chunks {} / {}'.format(cnt, total_chunk_count))
231+
self.__migrate_chunks(session, chunks)
133232
session.flush_and_expunge_all()
233+
if len(self.__migrated_chunk_hashes) == 0:
234+
self.logger.info('No chunk needs a compress method change, nothing to migrate')
235+
else:
236+
self.logger.info('Migrated {} chunks and related blob and files'.format(len(self.__migrated_chunk_hashes)))
237+
238+
# 3. migrate affected fileset and backup data
239+
self.__update_chunk_group_and_blobs_for_chunk_changes(session)
240+
self.__update_fileset_and_backups(session)
134241

135242
# 4. output
136-
after_size = session.get_blob_stored_size_sum()
243+
after_size = get_actual_blob_store_stored_size_sum()
137244

138245
except Exception:
139246
self.logger.warning('Error occurs during compress method migration, applying rollback')
@@ -142,8 +249,8 @@ def run(self) -> SizeDiff:
142249

143250
else:
144251
# 5. migration done, do some cleanup
145-
self.logger.info('Cleaning up old blobs')
146-
self.__erase_old_blobs()
252+
self.logger.info('Cleaning up old blob and chunk files')
253+
self.__erase_old_blob_and_chunk_files()
147254

148255
self.config.backup.compress_method = self.new_compress_method
149256
self.logger.info('Compress method migration done, cost {}s'.format(round(time.time() - t, 2)))

prime_backup/db/session.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,12 @@ def get_blob_stored_size_sum(self) -> int:
245245
# FIXME: chunked blob
246246
return _int_or_0(self.session.execute(func.sum(schema.Blob.stored_size).select()).scalar_one())
247247

248+
def get_direct_blob_stored_size_sum(self) -> int:
249+
return _int_or_0(self.session.execute(
250+
func.sum(schema.Blob.stored_size).select().
251+
where(schema.Blob.storage_method == BlobStorageMethod.direct.value)
252+
).scalar_one())
253+
248254
def get_blob_raw_size_sum(self) -> int:
249255
return _int_or_0(self.session.execute(func.sum(schema.Blob.raw_size).select()).scalar_one())
250256

@@ -270,6 +276,13 @@ def filtered_orphan_blob_hashes(self, hashes: List[str]) -> List[str]:
270276
if blob_hash not in good_hashes
271277
]
272278

279+
def calc_chunked_blob_stored_size_sum(self, blob_id: int) -> int:
280+
return _int_or_0(self.session.execute(
281+
select(func.sum(schema.ChunkGroup.chunk_stored_size_sum)).
282+
join(schema.BlobChunkGroupBinding, schema.ChunkGroup.id == schema.BlobChunkGroupBinding.chunk_group_id).
283+
where(schema.BlobChunkGroupBinding.blob_id == blob_id)
284+
).scalar_one())
285+
273286
# ===================================== Chunk =====================================
274287

275288
class CreateChunkKwargs(TypedDict):
@@ -332,6 +345,9 @@ def get_chunks_by_hashes(self, hashes: List[str]) -> Dict[str, Optional[schema.C
332345
result[chunk.hash] = chunk
333346
return result
334347

348+
def get_chunk_stored_size_sum(self) -> int:
349+
return _int_or_0(self.session.execute(func.sum(schema.Chunk.stored_size).select()).scalar_one())
350+
335351
def list_chunks(self, limit: Optional[int] = None, offset: Optional[int] = None) -> List[schema.Chunk]:
336352
s = select(schema.Chunk)
337353
if limit is not None:
@@ -763,6 +779,13 @@ def delete_blob_chunk_group_bindings(self, identifiers: List[BlobChunkGroupBindi
763779
combined_condition = or_(*conditions)
764780
self.session.execute(delete(schema.BlobChunkGroupBinding).where(combined_condition))
765781

782+
def calc_chunk_group_stored_size_sum(self, chunk_group_id: int) -> int:
783+
return _int_or_0(self.session.execute(
784+
select(func.sum(schema.Chunk.stored_size)).
785+
join(schema.ChunkGroupChunkBinding, schema.Chunk.id == schema.ChunkGroupChunkBinding.chunk_id).
786+
where(schema.ChunkGroupChunkBinding.chunk_group_id == chunk_group_id)
787+
).scalar_one())
788+
766789
# ===================================== File =====================================
767790

768791
class CreateFileKwargs(TypedDict):

prime_backup/utils/thread_pool.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
import queue
33
import threading
44
from concurrent.futures import ThreadPoolExecutor, Future
5-
from typing import List
5+
from typing import List, TypeVar, Callable
66

7-
from typing_extensions import override
7+
from typing_extensions import override, ParamSpec
88

99
from prime_backup.utils import misc_utils
1010

11+
_T = TypeVar('_T')
12+
_P = ParamSpec('_P')
13+
1114

1215
class FailFastBlockingThreadPool(ThreadPoolExecutor):
1316
"""
@@ -27,7 +30,7 @@ def __init__(self, name: str):
2730
self.__error_futures: 'queue.Queue[Future]' = queue.Queue()
2831

2932
@override
30-
def submit(self, __fn, *args, **kwargs):
33+
def submit(self, __fn: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs) -> Future[_T]:
3134
func = functools.partial(__fn, *args, **kwargs)
3235

3336
def wrapper_func():

0 commit comments

Comments
 (0)