Skip to content

Commit eb36fb5

Browse files
committed
copy_file_obj_fast attempt
1 parent 8928de6 commit eb36fb5

8 files changed

Lines changed: 76 additions & 26 deletions

File tree

prime_backup/action/export_backup_action_zip.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import contextlib
22
import os
3-
import shutil
43
import stat
54
import time
65
import zipfile
@@ -14,6 +13,7 @@
1413
from prime_backup.constants.constants import BACKUP_META_FILE_NAME
1514
from prime_backup.db import schema
1615
from prime_backup.types.export_failure import ExportFailures
16+
from prime_backup.utils import file_utils
1717
from prime_backup.utils.io_types import SupportsReadBytes
1818

1919

@@ -56,7 +56,7 @@ def __export_file(self, blob_chunks_getter: BlobChunksGetter, zipf: zipfile.ZipF
5656

5757
def reader_csm(reader: SupportsReadBytes):
5858
with zipf.open(info, 'w') as zip_item:
59-
shutil.copyfileobj(reader, zip_item)
59+
file_utils.copy_file_obj_fast(reader, zip_item, estimate_read_size=file.blob_raw_size)
6060

6161
self._create_blob_exporter(blob_chunks_getter, file).export_as_reader(reader_csm)
6262
elif stat.S_ISDIR(file.mode):

prime_backup/action/helpers/blob_allocator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,15 +342,15 @@ def get_blob_path_for_write(h: str) -> Path:
342342

343343
blob_path = get_blob_path_for_write(blob_hash)
344344
with self.__time_costs.measure_time_cost(CreateBackupTimeCostKey.kind_io_copy):
345-
cr = compressor.copy_compressed(temp_file_path, blob_path, calc_hash=False)
345+
cr = compressor.copy_compressed(temp_file_path, blob_path, calc_hash=False, estimate_read_size=st.st_size)
346346
raw_size, stored_size = cr.read_size, cr.write_size
347347

348348
elif policy == _DirectBlobCreatePolicy.hash_once:
349349
# read once, compress+hash to temp file, then move
350350
misc_utils.assert_true(blob_hash is None, 'blob_hash should not be calculated')
351351
with self.__make_temp_file(src_path_md5) as temp_file_path:
352352
with self.__time_costs.measure_time_cost(CreateBackupTimeCostKey.kind_io_copy):
353-
cr = compressor.copy_compressed(src_path, temp_file_path, calc_hash=True, open_r_func=SourceFileNotFoundWrapper.open_rb)
353+
cr = compressor.copy_compressed(src_path, temp_file_path, calc_hash=True, estimate_read_size=st.st_size, open_r_func=SourceFileNotFoundWrapper.open_rb)
354354
check_changes(cr.read_size, None) # the size must be unchanged, to satisfy the uniqueness
355355

356356
raw_size, blob_hash, stored_size = cr.read_size, cr.read_hash, cr.write_size
@@ -391,7 +391,7 @@ def get_blob_path_for_write(h: str) -> Path:
391391
else:
392392
# copy+compress+hash to blob store
393393
with self.__time_costs.measure_time_cost(CreateBackupTimeCostKey.kind_io_copy):
394-
cr = compressor.copy_compressed(src_path, blob_path, calc_hash=True, open_r_func=SourceFileNotFoundWrapper.open_rb)
394+
cr = compressor.copy_compressed(src_path, blob_path, calc_hash=True, estimate_read_size=st.st_size, open_r_func=SourceFileNotFoundWrapper.open_rb)
395395
raw_size, stored_size = cr.read_size, cr.write_size
396396
actual_sah = SizeAndHash(cr.read_size, cr.read_hash)
397397
check_changes(actual_sah.size, actual_sah.hash)

prime_backup/action/helpers/blob_exporter.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import contextlib
22
import dataclasses
33
import functools
4-
import shutil
54
import threading
65
from abc import ABC, abstractmethod
76
from pathlib import Path
@@ -152,9 +151,9 @@ def __export_to_fs_direct(self, output_path: Path):
152151
with open(output_path, 'wb') as f_out:
153152
if self.verify_blob:
154153
bypass_reader = BypassReader(f_in, calc_hash=True)
155-
shutil.copyfileobj(bypass_reader, f_out)
154+
file_utils.copy_file_obj_fast(bypass_reader, f_out, estimate_read_size=self.blob.raw_size)
156155
else:
157-
shutil.copyfileobj(f_in, f_out)
156+
file_utils.copy_file_obj_fast(f_in, f_out, estimate_read_size=self.blob.raw_size)
158157
if self.verify_blob and bypass_reader is not None:
159158
self.__verify_exported_blob(bypass_reader.get_read_len(), bypass_reader.get_hash())
160159

@@ -169,9 +168,9 @@ def __export_to_fs_chunked(self, output_path: Path):
169168
with compressor.open_decompressed(chunk_path) as f_in:
170169
if self.verify_blob:
171170
bypass_reader = BypassReader(f_in, calc_hash=True, hash_method=chunk_utils.get_hash_method())
172-
shutil.copyfileobj(bypass_reader, f_out)
171+
file_utils.copy_file_obj_fast(bypass_reader, f_out, estimate_read_size=oc.chunk.raw_size)
173172
else:
174-
shutil.copyfileobj(f_in, f_out)
173+
file_utils.copy_file_obj_fast(f_in, f_out, estimate_read_size=oc.chunk.raw_size)
175174

176175
if self.verify_blob and bypass_reader is not None:
177176
self.__verify_exported_chunk(oc.chunk, bypass_reader.get_read_len(), bypass_reader.get_hash())

prime_backup/action/import_backup_action.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import shutil
32
from pathlib import Path
43
from typing import IO, Optional, List, Dict, Tuple
54

@@ -24,7 +23,7 @@
2423
from prime_backup.types.standalone_backup_format import StandaloneBackupFormat
2524
from prime_backup.types.tar_format import TarFormat
2625
from prime_backup.types.units import ByteCount
27-
from prime_backup.utils import blob_utils, misc_utils, chunk_utils, collection_utils
26+
from prime_backup.utils import blob_utils, misc_utils, chunk_utils, collection_utils, file_utils
2827
from prime_backup.utils.hash_utils import SizeAndHash
2928

3029

@@ -68,7 +67,7 @@ def __create_blob_file(self, file_reader: IO[bytes], sah: SizeAndHash) -> Tuple[
6867
compress_method: CompressMethod = self.config.backup.get_compress_method_from_size(sah.size)
6968
compressor = Compressor.create(compress_method)
7069
with compressor.open_compressed_bypassed(blob_path) as (writer, f):
71-
shutil.copyfileobj(file_reader, f)
70+
file_utils.copy_file_obj_fast(file_reader, f, estimate_read_size=sah.size)
7271

7372
return writer.get_write_len(), compress_method
7473

prime_backup/action/migrate_compress_method_action.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import shutil
21
import time
32
from concurrent.futures import Future
43
from pathlib import Path
@@ -13,7 +12,7 @@
1312
from prime_backup.db.session import DbSession
1413
from prime_backup.db.values import BlobStorageMethod
1514
from prime_backup.types.size_diff import SizeDiff
16-
from prime_backup.utils import blob_utils, chunk_utils
15+
from prime_backup.utils import blob_utils, chunk_utils, file_utils
1716
from prime_backup.utils.thread_pool import FailFastBlockingThreadPool
1817

1918
_OLD_BLOB_SUFFIX = '_old'
@@ -64,7 +63,7 @@ def __migrate_single_direct_blob(self, blob: schema.Blob) -> bool:
6463
blob_path.replace(old_trash_path)
6564
with decompressor.open_decompressed(old_trash_path) as f_src:
6665
with compressor.open_compressed_bypassed(blob_path) as (writer, f_dst):
67-
shutil.copyfileobj(f_src, f_dst)
66+
file_utils.copy_file_obj_fast(f_src, f_dst, estimate_read_size=blob.stored_size)
6867

6968
blob.compress = new_compress_method.name
7069
blob.stored_size = writer.get_write_len()
@@ -101,7 +100,7 @@ def __migrate_single_chunk(self, chunk: schema.Chunk) -> bool:
101100
chunk_path.replace(old_trash_path)
102101
with decompressor.open_decompressed(old_trash_path) as f_src:
103102
with compressor.open_compressed_bypassed(chunk_path) as (writer, f_dst):
104-
shutil.copyfileobj(f_src, f_dst)
103+
file_utils.copy_file_obj_fast(f_src, f_dst, estimate_read_size=chunk.stored_size)
105104

106105
chunk.compress = new_compress_method.name
107106
chunk.stored_size = writer.get_write_len()

prime_backup/compressors.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import contextlib
22
import dataclasses
33
import enum
4-
import shutil
54
from abc import abstractmethod, ABC
65
from typing import BinaryIO, Union, ContextManager, Tuple, Callable, Literal, Generator
76

87
from typing_extensions import Protocol, override
98

9+
from prime_backup.utils import file_utils
1010
from prime_backup.utils.bypass_io import BypassReader, BypassWriter
1111
from prime_backup.utils.path_like import PathLike
1212

@@ -43,6 +43,7 @@ def ensure_lib(cls):
4343
def copy_compressed(
4444
self, source_path: PathLike, dest_path: PathLike, *,
4545
calc_hash: bool = False,
46+
estimate_read_size: int = 0,
4647
open_r_func: Callable[[PathLike, "Literal['rb']"], BinaryIO] = open,
4748
open_w_func: Callable[[PathLike, "Literal['wb']"], BinaryIO] = open,
4849
) -> CopyCompressResult:
@@ -52,7 +53,7 @@ def copy_compressed(
5253
with open_r_func(source_path, 'rb') as f_in, open_w_func(dest_path, 'wb') as f_out:
5354
reader = BypassReader(f_in, calc_hash=calc_hash)
5455
writer = BypassWriter(f_out)
55-
self._copy_compressed(reader, writer)
56+
self._copy_compressed(reader, writer, estimate_read_size=estimate_read_size)
5657
return self.CopyCompressResult(reader.get_read_len(), reader.get_hash(), writer.get_write_len())
5758

5859
def copy_decompressed(
@@ -106,19 +107,19 @@ def open_decompressed_bypassed(self, source_path: PathLike) -> Generator[Tuple[B
106107
with self.decompress_stream(reader) as f_decompressed:
107108
yield reader, f_decompressed
108109

109-
def _copy_compressed(self, f_in: BinaryIO, f_out: BinaryIO):
110+
def _copy_compressed(self, f_in: BinaryIO, f_out: BinaryIO, *, estimate_read_size: int = 0):
110111
"""
111112
(f_in) --[compress]--> (f_out)
112113
"""
113114
with self.compress_stream(f_out) as compressed_out:
114-
shutil.copyfileobj(f_in, compressed_out)
115+
file_utils.copy_file_obj_fast(f_in, compressed_out, estimate_read_size=estimate_read_size)
115116

116-
def _copy_decompressed(self, f_in: BinaryIO, f_out: BinaryIO):
117+
def _copy_decompressed(self, f_in: BinaryIO, f_out: BinaryIO, *, estimate_read_size: int = 0):
117118
"""
118119
(f_in) --[decompress]--> (f_out)
119120
"""
120121
with self.decompress_stream(f_in) as compressed_in:
121-
shutil.copyfileobj(compressed_in, f_out)
122+
file_utils.copy_file_obj_fast(compressed_in, f_out, estimate_read_size=estimate_read_size)
122123

123124
@abstractmethod
124125
def compress_stream(self, f_out: BinaryIO) -> ContextManager[BinaryIO]:

prime_backup/db/db_file_backup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import contextlib
2-
import shutil
32
from pathlib import Path
43
from typing import Generator
54

5+
from prime_backup.utils import file_utils
66
from prime_backup.utils.io_types import SupportsReadBytes, SupportsWriteBytes
77

88

@@ -46,11 +46,11 @@ def create(self, skip_existing: bool):
4646
if skip_existing and self.backup_file.is_file():
4747
return
4848
with open(self.src_file, 'rb') as f_src, self.__open_backup_for_write() as f_dst:
49-
shutil.copyfileobj(f_src, f_dst)
49+
file_utils.copy_file_obj_fast(f_src, f_dst)
5050

5151
def restore(self):
5252
with open(self.src_file, 'wb') as f_dst, self.__open_backup_for_read() as f_src:
53-
shutil.copyfileobj(f_src, f_dst)
53+
file_utils.copy_file_obj_fast(f_src, f_dst)
5454

5555
def delete_all(self):
5656
self.backup_file.unlink(missing_ok=True)

prime_backup/utils/file_utils.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import errno
2+
import functools
23
import os
4+
import queue
35
import shutil
46
import stat
7+
from concurrent.futures import ThreadPoolExecutor
58
from pathlib import Path
69
from typing import Optional, Literal, BinaryIO, Callable
710

811
import psutil
912

1013
from prime_backup.utils import path_utils
14+
from prime_backup.utils.io_types import SupportsReadBytes, SupportsWriteBytes
1115

1216
HAS_COPY_FILE_RANGE = callable(getattr(os, 'copy_file_range', None))
1317

@@ -46,6 +50,54 @@ def copy_file_fast(
4650
shutil.copyfile(src_path, dst_path, follow_symlinks=False)
4751

4852

53+
class _ThreadedFastFileObjCopier:
54+
COPY_BUFSIZE = 1024 * 1024 if os.name == 'nt' else 4 * 1024 # == shutil.COPY_BUFSIZE
55+
MEMORY_CACHE_SIZE = 8 * 1048576
56+
57+
def __init__(self, concurrency: int):
58+
self.concurrency = concurrency
59+
self.thread_pool = ThreadPoolExecutor(max_workers=concurrency)
60+
61+
def copy(self, src: SupportsReadBytes, dst: SupportsWriteBytes):
62+
q: 'queue.Queue[Optional[bytes]]' = queue.Queue(maxsize=max(1, self.MEMORY_CACHE_SIZE // self.COPY_BUFSIZE))
63+
func_exited = False
64+
65+
# reference: shutil.copyfileobj
66+
q_put = q.put
67+
q_get = q.get
68+
buf_size = self.COPY_BUFSIZE
69+
read_func = src.read
70+
write_func = dst.write
71+
72+
def read_worker():
73+
try:
74+
while not func_exited and (read_buf := read_func(buf_size)):
75+
q_put(read_buf)
76+
finally:
77+
q_put(None)
78+
79+
future = self.thread_pool.submit(read_worker)
80+
try:
81+
while (write_buf := q_get()) is not None:
82+
write_func(write_buf)
83+
future.result()
84+
finally:
85+
func_exited = True
86+
87+
88+
@functools.lru_cache(None)
89+
def __get_copier():
90+
from prime_backup.config.config import Config
91+
return _ThreadedFastFileObjCopier(Config.get().get_effective_concurrency())
92+
93+
94+
def copy_file_obj_fast(src: SupportsReadBytes, dst: SupportsWriteBytes, *, estimate_read_size: int = 0):
95+
if estimate_read_size > 1048576 and False: # TODO
96+
__get_copier().copy(src, dst)
97+
else:
98+
shutil.copyfileobj(src, dst)
99+
100+
49101
def rm_rf(path: Path, *, missing_ok: bool = False):
50102
"""
51103
Does not follow symlink

0 commit comments

Comments
 (0)