1111import time
1212from abc import ABC , abstractmethod
1313from pathlib import Path
14- from typing import List , Optional , Tuple , Callable , Any , Dict , Generator , Union , Set , Deque , Literal , BinaryIO
14+ from typing import List , Optional , Tuple , Callable , Any , Dict , Generator , Union , Set , Deque , Literal , BinaryIO , ContextManager , overload , TypeVar
1515
1616import pathspec
1717from typing_extensions import NoReturn , override , Self
@@ -195,15 +195,21 @@ def _batch_run(self):
195195 self .hashes .clear ()
196196
197197
198- class BatchQueryManager :
199- Reqs = Union [BlobBySizeFetcher .Req , BlobByHashFetcher .Req ]
200- Rsps = Union [ BlobBySizeFetcher . Rsp , BlobByHashFetcher . Rsp ]
198+ BqmReq = Union [ BlobBySizeFetcher . Req , BlobByHashFetcher . Req ]
199+ BqmRsp = Union [BlobBySizeFetcher .Rsp , BlobByHashFetcher .Rsp ]
200+
201201
202+ class BatchQueryManager :
202203 def __init__ (self , session : DbSession , size_result_cache : dict , hash_result_cache : dict , time_costs : TimeCostStats [_TimeCostKey ], * , max_batch_size : int = 100 ):
203204 self .fetcher_size = BlobBySizeFetcher (session , max_batch_size , size_result_cache , time_costs )
204205 self .fetcher_hash = BlobByHashFetcher (session , max_batch_size , hash_result_cache , time_costs )
205206
206- def query (self , query : Reqs , callback : Callable [[Rsps ], None ]):
207+ @overload
208+ def query (self , query : BlobBySizeFetcher .Req , callback : Callable [[BlobBySizeFetcher .Rsp ], None ]): ...
209+ @overload
210+ def query (self , query : BlobByHashFetcher .Req , callback : Callable [[BlobByHashFetcher .Rsp ], None ]): ...
211+
212+ def query (self ,query : BqmReq , callback : Callable [[BqmRsp ], None ]):
207213 if isinstance (query , BlobBySizeFetcher .Req ):
208214 self .fetcher_size .query (query , callback )
209215 elif isinstance (query , BlobByHashFetcher .Req ):
@@ -678,6 +684,86 @@ def __create_file(self, session: DbSession, path: Path) -> Generator[Any, Any, s
678684 blob = blob ,
679685 )
680686
687+ def __create_backup (self , session_context : ContextManager [DbSession ], session : DbSession ) -> BackupInfo :
688+ self .logger .info ('Scanning file for backup creation at path {!r}, targets: {}' .format (
689+ self .__source_path .as_posix (), self .config .backup .targets ,
690+ ))
691+ with self .__time_costs .measure_time_cost (_TimeCostKey .stage_scan_files ):
692+ scan_result = self .__scan_files ()
693+ backup = session .create_backup (
694+ creator = str (self .creator ),
695+ comment = self .comment ,
696+ targets = scan_result .root_targets ,
697+ tags = self .tags .to_dict (),
698+ )
699+ self .logger .info ('Creating backup for {} at path {!r}, file cnt {}, timestamp {!r}, creator {!r}, comment {!r}, tags {!r}' .format (
700+ scan_result .root_targets , self .__source_path .as_posix (), len (scan_result .all_files ),
701+ backup .timestamp , backup .creator , backup .comment , backup .tags ,
702+ ))
703+
704+ self .__pre_calculate_stats (scan_result )
705+ if self .config .backup .reuse_stat_unchanged_file :
706+ with self .__time_costs .measure_time_cost (_TimeCostKey .stage_reuse_unchanged_files ):
707+ self .__reuse_unchanged_files (session , scan_result )
708+ self .logger .info ('Reused {} / {} stat unchanged files' .format (len (self .__pre_calc_result .reused_files ), len (scan_result .all_files )))
709+ if self .config .get_effective_concurrency () > 1 :
710+ with self .__time_costs .measure_time_cost (_TimeCostKey .stage_pre_calculate_hash ):
711+ self .__pre_calculate_hash (session , scan_result )
712+ self .logger .info ('Pre-calculate all file hash done' )
713+
714+ with self .__time_costs .measure_time_cost (_TimeCostKey .stage_prepare_blob_store , _TimeCostKey .kind_fs ):
715+ blob_utils .prepare_blob_directories ()
716+ bs_path = blob_utils .get_blob_store ()
717+ self .__blob_store_st = bs_path .stat ()
718+ self .__blob_store_in_cow_fs = file_utils .does_fs_support_cow (bs_path )
719+
720+ @functools .lru_cache (None )
721+ def get_skip_missing_source_file_patterns () -> pathspec .GitIgnoreSpec :
722+ return pathspec .GitIgnoreSpec .from_lines (self .config .backup .creation_skip_missing_file_patterns )
723+
724+ def should_skip_missing_source_file (src_file_path : Path ) -> bool :
725+ if self .config .backup .creation_skip_missing_file :
726+ try :
727+ rel_path = src_file_path .relative_to (self .__source_path )
728+ except ValueError :
729+ self .logger .error ("Path {!r} is not inside the source path {!r}" .format (str (src_file_path ), str (self .__source_path )))
730+ else :
731+ return get_skip_missing_source_file_patterns ().match_file (rel_path )
732+ return False
733+
734+ files = []
735+ with self .__time_costs .measure_time_cost (_TimeCostKey .stage_create_files ):
736+ schedule_queue : Deque [Tuple [Generator [BqmReq , Optional [BqmRsp ], schema .File ], Optional [BqmRsp ]]] = collections .deque ()
737+ for file_entry in scan_result .all_files :
738+ schedule_queue .append ((self .__create_file (session , file_entry .path ), None ))
739+ while len (schedule_queue ) > 0 :
740+ gen , value = schedule_queue .popleft ()
741+ try :
742+ def callback (query_rsp : BqmRsp , g = gen ):
743+ schedule_queue .appendleft ((g , query_rsp ))
744+
745+ query_req = gen .send (value )
746+ self .__batch_query_manager .query (query_req , callback )
747+ except StopIteration as e :
748+ files .append (misc_utils .ensure_type (e .value , schema .File ))
749+ except _SourceFileNotFound as e :
750+ if should_skip_missing_source_file (e .file_path ):
751+ self .logger .warning ('Backup source file {!r} not found, suppressed and skipped by config' .format (str (e .file_path )))
752+ else :
753+ raise
754+
755+ self .__batch_query_manager .flush_if_needed ()
756+ if len (schedule_queue ) == 0 :
757+ self .__batch_query_manager .flush ()
758+
759+ with self .__time_costs .measure_time_cost (_TimeCostKey .stage_finalize ):
760+ self ._finalize_backup_and_files (session , backup , files )
761+ info = BackupInfo .of (backup )
762+
763+ with self .__time_costs .measure_time_cost (_TimeCostKey .stage_flush_db , _TimeCostKey .kind_db ):
764+ session_context .__exit__ (None , None , None )
765+ return info
766+
681767 @override
682768 def run (self ) -> BackupInfo :
683769 super ().run ()
@@ -694,84 +780,7 @@ def run(self) -> BackupInfo:
694780 session_context = DbAccess .open_session ()
695781 with session_context as session :
696782 self .__batch_query_manager = BatchQueryManager (session , self .__blob_by_size_cache , self .__blob_by_hash_cache , self .__time_costs )
697-
698- self .logger .info ('Scanning file for backup creation at path {!r}, targets: {}' .format (
699- self .__source_path .as_posix (), self .config .backup .targets ,
700- ))
701- with self .__time_costs .measure_time_cost (_TimeCostKey .stage_scan_files ):
702- scan_result = self .__scan_files ()
703- backup = session .create_backup (
704- creator = str (self .creator ),
705- comment = self .comment ,
706- targets = scan_result .root_targets ,
707- tags = self .tags .to_dict (),
708- )
709- self .logger .info ('Creating backup for {} at path {!r}, file cnt {}, timestamp {!r}, creator {!r}, comment {!r}, tags {!r}' .format (
710- scan_result .root_targets , self .__source_path .as_posix (), len (scan_result .all_files ),
711- backup .timestamp , backup .creator , backup .comment , backup .tags ,
712- ))
713-
714- self .__pre_calculate_stats (scan_result )
715- if self .config .backup .reuse_stat_unchanged_file :
716- with self .__time_costs .measure_time_cost (_TimeCostKey .stage_reuse_unchanged_files ):
717- self .__reuse_unchanged_files (session , scan_result )
718- self .logger .info ('Reused {} / {} stat unchanged files' .format (len (self .__pre_calc_result .reused_files ), len (scan_result .all_files )))
719- if self .config .get_effective_concurrency () > 1 :
720- with self .__time_costs .measure_time_cost (_TimeCostKey .stage_pre_calculate_hash ):
721- self .__pre_calculate_hash (session , scan_result )
722- self .logger .info ('Pre-calculate all file hash done' )
723-
724- with self .__time_costs .measure_time_cost (_TimeCostKey .stage_prepare_blob_store , _TimeCostKey .kind_fs ):
725- blob_utils .prepare_blob_directories ()
726- bs_path = blob_utils .get_blob_store ()
727- self .__blob_store_st = bs_path .stat ()
728- self .__blob_store_in_cow_fs = file_utils .does_fs_support_cow (bs_path )
729-
730- @functools .lru_cache (None )
731- def get_skip_missing_source_file_patterns () -> pathspec .GitIgnoreSpec :
732- return pathspec .GitIgnoreSpec .from_lines (self .config .backup .creation_skip_missing_file_patterns )
733-
734- def should_skip_missing_source_file (src_file_path : Path ) -> bool :
735- if self .config .backup .creation_skip_missing_file :
736- try :
737- rel_path = src_file_path .relative_to (self .__source_path )
738- except ValueError :
739- self .logger .error ("Path {!r} is not inside the source path {!r}" .format (str (src_file_path ), str (self .__source_path )))
740- else :
741- return get_skip_missing_source_file_patterns ().match_file (rel_path )
742- return False
743-
744- files = []
745- with self .__time_costs .measure_time_cost (_TimeCostKey .stage_create_files ):
746- schedule_queue : Deque [Tuple [Generator , Any ]] = collections .deque ()
747- for file_entry in scan_result .all_files :
748- schedule_queue .append ((self .__create_file (session , file_entry .path ), None ))
749- while len (schedule_queue ) > 0 :
750- gen , value = schedule_queue .popleft ()
751- try :
752- def callback (query_rsp , g = gen ):
753- schedule_queue .appendleft ((g , query_rsp ))
754-
755- query_req = gen .send (value )
756- self .__batch_query_manager .query (query_req , callback )
757- except StopIteration as e :
758- files .append (misc_utils .ensure_type (e .value , schema .File ))
759- except _SourceFileNotFound as e :
760- if should_skip_missing_source_file (e .file_path ):
761- self .logger .warning ('Backup source file {!r} not found, suppressed and skipped by config' .format (str (e .file_path )))
762- else :
763- raise
764-
765- self .__batch_query_manager .flush_if_needed ()
766- if len (schedule_queue ) == 0 :
767- self .__batch_query_manager .flush ()
768-
769- with self .__time_costs .measure_time_cost (_TimeCostKey .stage_finalize ):
770- self ._finalize_backup_and_files (session , backup , files )
771- info = BackupInfo .of (backup )
772-
773- with self .__time_costs .measure_time_cost (_TimeCostKey .stage_flush_db , _TimeCostKey .kind_db ):
774- session_context .__exit__ (None , None , None )
783+ info = self .__create_backup (session_context , session )
775784 except Exception as e :
776785 self ._apply_blob_rollback ()
777786 raise e
0 commit comments