Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a5e49fc
Gather upgrade results a different way
rajeee Aug 7, 2025
c56a1bf
efficient pp
rajeee Aug 11, 2025
ece7109
More schema cleanup
rajeee Aug 12, 2025
fdc3e19
Cleanup and gz fix
rajeee Aug 12, 2025
d26e6bb
Default job_id
rajeee Aug 13, 2025
13b29ed
Fix tests
rajeee Aug 13, 2025
c47ebec
Change filename
rajeee Aug 13, 2025
a4b7aa4
Add ultra low disk mode
rajeee Aug 13, 2025
9f48d45
Fix test and uld mode
rajeee Aug 13, 2025
8e147ff
Allow some failures
rajeee Aug 13, 2025
b1696a8
Always add eplusout_error and upgrade
rajeee Aug 14, 2025
91561d6
Merge branch 'develop' into ppv2
rajeee Aug 14, 2025
0d2c71a
Use new workflow in hpc run
rajeee Aug 15, 2025
1b59d31
Merge branch 'develop' into ppv2
rajeee Aug 15, 2025
8616be7
HPC fixes
rajeee Aug 15, 2025
10f4c8f
Use 5 significant digits in the timeseries output
rajeee Aug 16, 2025
055decf
Don't write statistics in parquet file for annual results
rajeee Aug 18, 2025
1497eba
Use nvme for workers as well
rajeee Aug 18, 2025
51bc579
Add upper limit to str length of eplusout_err and step_failures
rajeee Aug 18, 2025
3eb2d62
Disable enduse level emissions by default
rajeee Aug 19, 2025
2ad98dc
Fix the test
rajeee Aug 19, 2025
ed5287c
Use read_parquet to reduce file I/O
rajeee Aug 20, 2025
5eb233c
Use read_parquet to reduce file I/O
rajeee Aug 20, 2025
d24996a
Streaming parquet writer
rajeee Aug 22, 2025
2921d6f
Streaming parquet writer working for local
rajeee Aug 22, 2025
0530a8f
Path fix
rajeee Aug 22, 2025
d2c3f51
use pathlib
rajeee Aug 22, 2025
e24ded7
No need to create dirs beforehand
rajeee Aug 22, 2025
bbfcc47
Drop upgrade column before writing partitioned parquet
rajeee Aug 22, 2025
cdb8eca
Path fix
rajeee Aug 23, 2025
b6ed5a5
Handle ultra low disk mode
rajeee Aug 23, 2025
0acbf82
Skip individual ts writing for local
rajeee Aug 23, 2025
b8d2936
Make dir if not exist
rajeee Aug 23, 2025
aad4058
Write building_id in dataframe
rajeee Aug 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
with:
repository: NREL/resstock
path: resstock
ref: develop
ref: better-minimal-buildstock

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume this gets changed back after resstock gets updated too.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this PR relies on some changes on resstock currently only available on that branch. These two branches are mutually tied up together - one needs the other. So, once we are ready on both sides, we switch the branch to develop on both sides and merge both side in quick order.

- name: Remove AWS from resstock yaml
run: |
cd resstock
Expand Down
31 changes: 21 additions & 10 deletions buildstockbatch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,18 @@ def path_rel_to_projectfile(self, x):

def _get_weather_files(self):
if "weather_files_path" in self.cfg:
logger.debug("Copying weather files")
weather_file_path = self.cfg["weather_files_path"]
with zipfile.ZipFile(weather_file_path, "r") as zf:
logger.debug("Extracting weather files to: {}".format(self.weather_dir))
zf.extractall(self.weather_dir)
if os.path.isdir(weather_file_path):

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weather file path now can be directory too (not just a zipfile). Will make buildstock_local much faster if using pre-unzipped directory.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet!

if os.path.isdir(self.weather_dir) and os.path.samefile(self.weather_dir, weather_file_path):
logger.debug(f"Weather files already exist at {self.weather_dir}")
return
else:
logger.debug(f"Copying weather files from directory: {weather_file_path} to {self.weather_dir}")
shutil.copytree(weather_file_path, self.weather_dir, dirs_exist_ok=True)
else:
with zipfile.ZipFile(weather_file_path, "r") as zf:
logger.debug(f"Extracting weather files to: {self.weather_dir}")
zf.extractall(self.weather_dir)
else:
logger.debug("Downloading weather files")
r = requests.get(self.cfg["weather_files_url"], stream=True)
Expand Down Expand Up @@ -188,7 +195,7 @@ def make_sim_dir(building_id, upgrade_idx, base_dir, overwrite_existing=False):
return sim_id, sim_dir

@staticmethod
def cleanup_sim_dir(sim_dir, dest_fs, simout_ts_dir, upgrade_id, building_id, low_disk=False):
def cleanup_sim_dir(sim_dir, dest_fs, simout_ts_dir, upgrade_id, building_id, low_disk=""):
"""Clean up the output directory for a single simulation.

:param sim_dir: simulation directory
Expand All @@ -201,8 +208,10 @@ def cleanup_sim_dir(sim_dir, dest_fs, simout_ts_dir, upgrade_id, building_id, lo
:type upgrade_id: int
:param building_id: building id from buildstock.csv
:type building_id: int
:param low_disk: If true, remove the simulation directory entirely to save disk space
:type low_disk: bool
:param low_disk: If "low_disk", remove the simulation directory entirely to save disk space.
If "ultra_low_disk_no_timeseries", remove the simulation directory entirely to
save disk space and also delete the timeseries parquet file.
:type low_disk: str
"""

# Convert the timeseries data to parquet
Expand Down Expand Up @@ -263,6 +272,11 @@ def get_clean_column_name(x):

if low_disk:
shutil.rmtree(sim_dir, ignore_errors=True)
if (
low_disk == "ultra_low_disk_no_timeseries"
): # only delete after writing to allow testing of writing workflow
if os.path.exists(f"{simout_ts_dir}/up{upgrade_id:02d}/bldg{building_id:07d}.parquet"):
os.remove(f"{simout_ts_dir}/up{upgrade_id:02d}/bldg{building_id:07d}.parquet")
return

# Remove files already in data_point.zip
Expand Down Expand Up @@ -980,6 +994,3 @@ def process_results(self, skip_combine=False, use_dask_cluster=True, continue_up
finally:
if use_dask_cluster:
self.cleanup_dask()

keep_individual_timeseries = self.cfg.get("postprocessing", {}).get("keep_individual_timeseries", False)
postprocessing.remove_intermediate_files(fs, self.results_dir, keep_individual_timeseries)
27 changes: 23 additions & 4 deletions buildstockbatch/hpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tempfile
import time
import csv
import polars as pl

from buildstockbatch.base import BuildStockBatchBase, SimulationExists
from buildstockbatch.utils import (
Expand All @@ -42,6 +43,7 @@
get_project_configuration,
read_csv,
get_bool_env_var,
get_data_dict_schema,
)
from buildstockbatch import postprocessing
from buildstockbatch.__version__ import __version__ as bsb_version
Expand Down Expand Up @@ -257,15 +259,25 @@ def run_job_batch(self, job_array_number):
@delayed
def run_building_d(i, upgrade_idx):
try:
return self.run_building(self.output_dir, self.cfg, args["n_datapoints"], i, upgrade_idx)
return self.run_building(
self.output_dir, self.cfg, args["n_datapoints"], i, upgrade_idx, job_array_number
)
except Exception:
with open(traceback_file_path, "a") as f:
txt = get_error_details()
txt = "\n" + "#" * 20 + "\n" + f"Traceback for building{i}\n" + txt
f.write(txt)
del txt
upgrade_id = 0 if upgrade_idx is None else upgrade_idx + 1
return {"building_id": i, "upgrade": upgrade_id}
dpout = {"building_id": i, "upgrade": upgrade_id, "job_id": job_array_number}
dp_df = pl.from_dict(dpout)
stock_type = self.cfg.get("stock_type", "residential")
full_schema = get_data_dict_schema(stock_type, dp_df.columns)
dp_df = dp_df.with_columns([pl.col(col).cast(dtype) for col, dtype in full_schema.items()])
dp_df.write_parquet(
f"{self.output_dir}/results/simulation_output/annual/up{upgrade_id:02d}/bldg{i:07d}.parquet"
)
return (upgrade_id, i)

# Run the simulations, get the data_point_out.json info from each
tick = time.time()
Expand Down Expand Up @@ -342,7 +354,7 @@ def run_building_d(i, upgrade_idx):
self.local_apptainer_img.unlink(missing_ok=True)

@classmethod
def run_building(cls, output_dir, cfg, n_datapoints, i, upgrade_idx=None):
def run_building(cls, output_dir, cfg, n_datapoints, i, upgrade_idx=None, job_array_number=0):
fs = LocalFileSystem()
upgrade_id = 0 if upgrade_idx is None else upgrade_idx + 1

Expand Down Expand Up @@ -472,7 +484,14 @@ def run_building(cls, output_dir, cfg, n_datapoints, i, upgrade_idx=None):

reporting_measures = cls.get_reporting_measures(cfg)
dpout = postprocessing.read_simulation_outputs(fs, reporting_measures, sim_dir, upgrade_id, i)
return dpout
dpout = {postprocessing.to_camelcase(key): value for key, value in dpout.items()}
dpout["job_id"] = job_array_number # Used by downstream code. For local run, job_id is always zero.
dp_df = pl.from_dict(dpout)
stock_type = cfg.get("stock_type", "residential")
full_schema = get_data_dict_schema(stock_type, dp_df.columns)
dp_df = dp_df.with_columns([pl.col(col).cast(dtype) for col, dtype in full_schema.items()])
dp_df.write_parquet(f"{output_dir}/results/simulation_output/annual/up{upgrade_id:02d}/bldg{i:07d}.parquet")
return (upgrade_id, i)

@staticmethod
def _queue_jobs_env_vars() -> dict:
Expand Down
73 changes: 43 additions & 30 deletions buildstockbatch/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
import subprocess
import tarfile
import time
import polars as pl

from buildstockbatch.base import BuildStockBatchBase, SimulationExists
from buildstockbatch import postprocessing
from buildstockbatch.utils import log_error_details, ContainerRuntime, read_csv
from buildstockbatch.utils import log_error_details, ContainerRuntime, read_csv, get_data_dict_schema
from buildstockbatch.__version__ import __version__ as bsb_version

logger = logging.getLogger(__name__)
Expand All @@ -48,9 +49,10 @@ def __init__(self, project_filename):
self._weather_dir = None

# Create simulation_output dir
sim_out_ts_dir = pathlib.Path(self.results_dir, "simulation_output", "timeseries")
for i in range(0, len(self.cfg.get("upgrades", [])) + 1):
(sim_out_ts_dir / f"up{i:02d}").mkdir(exist_ok=True, parents=True)
for dir in ["timeseries", "annual"]:
sim_out_dir = pathlib.Path(self.results_dir, "simulation_output", dir)
for i in range(0, self.num_upgrades + 1):
(sim_out_dir / f"up{i:02d}").mkdir(exist_ok=True, parents=True)

# Install custom gems if requested
if self.cfg.get("baseline", dict()).get("custom_gems", False):
Expand Down Expand Up @@ -219,9 +221,16 @@ def run_building(
i,
low_disk=low_disk,
)
return dpout

def run_batch(self, n_jobs=None, measures_only=False, sampling_only=False, low_disk=False):
dpout = {postprocessing.to_camelcase(key): value for key, value in dpout.items()}
dpout["job_id"] = 0 # Used by downstream code. For local run, job_id is always zero.
dp_df = pl.from_dict(dpout)
stock_type = cfg.get("stock_type", "residential")
full_schema = get_data_dict_schema(stock_type, dp_df.columns)
dp_df = dp_df.with_columns([pl.col(col).cast(dtype) for col, dtype in full_schema.items()])
dp_df.write_parquet(f"{results_dir}/simulation_output/annual/up{upgrade_id:02d}/bldg{i:07d}.parquet")
return (upgrade_id, i)

def run_batch(self, n_jobs=None, measures_only=False, sampling_only=False, low_disk=""):
buildstock_csv_filename = self.sampler.run_sampling()

if sampling_only:
Expand Down Expand Up @@ -266,28 +275,16 @@ def run_batch(self, n_jobs=None, measures_only=False, sampling_only=False, low_d
all_sims = itertools.chain(*upgrade_sims)
if n_jobs is None:
n_jobs = -1
dpouts = Parallel(n_jobs=n_jobs, verbose=10)(all_sims)
completed_jobs = Parallel(n_jobs=n_jobs, verbose=10)(all_sims)

time.sleep(10)
shutil.rmtree(lib_path)
shutil.rmtree(lib_path, ignore_errors=True)

sim_out_path = pathlib.Path(self.results_dir, "simulation_output")

results_job_json_filename = sim_out_path / "results_job0.json.gz"
with gzip.open(results_job_json_filename, "wt", encoding="utf-8") as f:
json.dump(dpouts, f)
del dpouts

if low_disk:
return

sim_out_tarfile_name = sim_out_path / "simulations_job0.tar.gz"
logger.debug(f"Compressing simulation outputs to {sim_out_tarfile_name}")
with tarfile.open(sim_out_tarfile_name, "w:gz") as tarf:
for dirname in os.listdir(sim_out_path):
if re.match(r"up\d+", dirname) and (sim_out_path / dirname).is_dir():
tarf.add(sim_out_path / dirname, arcname=dirname)
shutil.rmtree(sim_out_path / dirname, ignore_errors=True)
results_job_json_filename = sim_out_path / "completed_jobs.json"
with open(results_job_json_filename, "w") as f:
json.dump(completed_jobs, f)

@property
def output_dir(self):
Expand Down Expand Up @@ -456,11 +453,6 @@ def main():
action="store_true",
help="Only apply the measures, but don't run simulations. Useful for debugging.",
)
parser.add_argument(
"--low-disk",
action="store_true",
help="Delete unused simulation result files immediately after processing to save disk space.",
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--postprocessonly",
Expand Down Expand Up @@ -488,6 +480,19 @@ def main():
help="Only validate the project YAML file and references. Nothing is executed",
action="store_true",
)
group.add_argument(
"--low-disk",
action="store_true",
help="Delete unused simulation result files immediately after processing to save disk space.",
)
group.add_argument(
"--ultra-low-disk-no-timeseries",
action="store_true",
help=(
"Don't save timeseries data to save disk space. This is different from disabling timeseries in the yaml"
" as it will still process timeseries results (useful for testing) but not save the results."
),
)
group.add_argument("--samplingonly", help="Run the sampling only.", action="store_true")
args = parser.parse_args()
if not os.path.isfile(args.project_filename):
Expand All @@ -498,12 +503,20 @@ def main():
if args.validateonly:
return
batch = LocalBatch(args.project_filename)

if args.low_disk:
low_disk = "low_disk"
elif args.ultra_low_disk_no_timeseries:
low_disk = "ultra_low_disk_no_timeseries"
else:
low_disk = ""

if not (args.postprocessonly or args.uploadonly or args.validateonly or args.continue_upload):
batch.run_batch(
n_jobs=args.j,
measures_only=args.measures_only,
sampling_only=args.samplingonly,
low_disk=args.low_disk,
low_disk=low_disk,
)
if args.measures_only or args.samplingonly:
return
Expand Down
Loading
Loading