Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a5e49fc
Gather upgrade results a different way
rajeee Aug 7, 2025
c56a1bf
efficient pp
rajeee Aug 11, 2025
ece7109
More schema cleanup
rajeee Aug 12, 2025
fdc3e19
Cleanup and gz fix
rajeee Aug 12, 2025
d26e6bb
Default job_id
rajeee Aug 13, 2025
13b29ed
Fix tests
rajeee Aug 13, 2025
c47ebec
Change filename
rajeee Aug 13, 2025
a4b7aa4
Add ultra low disk mode
rajeee Aug 13, 2025
9f48d45
Fix test and uld mode
rajeee Aug 13, 2025
8e147ff
Allow some failures
rajeee Aug 13, 2025
b1696a8
Always add eplusout_error and upgrade
rajeee Aug 14, 2025
91561d6
Merge branch 'develop' into ppv2
rajeee Aug 14, 2025
0d2c71a
Use new workflow in hpc run
rajeee Aug 15, 2025
1b59d31
Merge branch 'develop' into ppv2
rajeee Aug 15, 2025
8616be7
HPC fixes
rajeee Aug 15, 2025
10f4c8f
Use 5 significant digits in the timeseries output
rajeee Aug 16, 2025
055decf
Don't write statistics in parquet file for annual results
rajeee Aug 18, 2025
1497eba
Use nvme for workers as well
rajeee Aug 18, 2025
51bc579
Add upper limit to str length of eplusout_err and step_failures
rajeee Aug 18, 2025
3eb2d62
Disable enduse level emissions by default
rajeee Aug 19, 2025
2ad98dc
Fix the test
rajeee Aug 19, 2025
ed5287c
Use read_parquet to reduce file I/O
rajeee Aug 20, 2025
5eb233c
Use read_parquet to reduce file I/O
rajeee Aug 20, 2025
d24996a
Streaming parquet writer
rajeee Aug 22, 2025
2921d6f
Streaming parquet writer working for local
rajeee Aug 22, 2025
0530a8f
Path fix
rajeee Aug 22, 2025
d2c3f51
use pathlib
rajeee Aug 22, 2025
e24ded7
No need to create dirs beforehand
rajeee Aug 22, 2025
bbfcc47
Drop upgrade column before writing partitioned parquet
rajeee Aug 22, 2025
cdb8eca
Path fix
rajeee Aug 23, 2025
b6ed5a5
Handle ultra low disk mode
rajeee Aug 23, 2025
0acbf82
Skip individual ts writing for local
rajeee Aug 23, 2025
b8d2936
Make dir if not exist
rajeee Aug 23, 2025
aad4058
Write building_id in dataframe
rajeee Aug 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
with:
repository: NREL/resstock
path: resstock
ref: develop
ref: better-minimal-buildstock

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume this gets changed back after resstock gets updated too.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this PR relies on some changes on resstock currently only available on that branch. These two branches are mutually tied up together - one needs the other. So, once we are ready on both sides, we switch the branch to develop on both sides and merge both side in quick order.

- name: Remove AWS from resstock yaml
run: |
cd resstock
Expand Down
56 changes: 38 additions & 18 deletions buildstockbatch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import csv
from collections import defaultdict, Counter
import pprint
import pathlib

from buildstockbatch.__version__ import __schema_version__
from buildstockbatch import sampler, workflow_generator, postprocessing
Expand Down Expand Up @@ -107,11 +108,18 @@ def path_rel_to_projectfile(self, x):

def _get_weather_files(self):
if "weather_files_path" in self.cfg:
logger.debug("Copying weather files")
weather_file_path = self.cfg["weather_files_path"]
with zipfile.ZipFile(weather_file_path, "r") as zf:
logger.debug("Extracting weather files to: {}".format(self.weather_dir))
zf.extractall(self.weather_dir)
if os.path.isdir(weather_file_path):

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weather file path now can be directory too (not just a zipfile). Will make buildstock_local much faster if using pre-unzipped directory.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet!

if os.path.isdir(self.weather_dir) and os.path.samefile(self.weather_dir, weather_file_path):
logger.debug(f"Weather files already exist at {self.weather_dir}")
return
else:
logger.debug(f"Copying weather files from directory: {weather_file_path} to {self.weather_dir}")
shutil.copytree(weather_file_path, self.weather_dir, dirs_exist_ok=True)
else:
with zipfile.ZipFile(weather_file_path, "r") as zf:
logger.debug(f"Extracting weather files to: {self.weather_dir}")
zf.extractall(self.weather_dir)
else:
logger.debug("Downloading weather files")
r = requests.get(self.cfg["weather_files_url"], stream=True)
Expand Down Expand Up @@ -188,7 +196,9 @@ def make_sim_dir(building_id, upgrade_idx, base_dir, overwrite_existing=False):
return sim_id, sim_dir

@staticmethod
def cleanup_sim_dir(sim_dir, dest_fs, simout_ts_dir, upgrade_id, building_id, low_disk=False):
def get_timeseries_df(
sim_dir, dest_fs, simout_ts_dir, upgrade_id, building_id, low_disk="", skip_write: bool = False
):
"""Clean up the output directory for a single simulation.

:param sim_dir: simulation directory
Expand All @@ -201,14 +211,18 @@ def cleanup_sim_dir(sim_dir, dest_fs, simout_ts_dir, upgrade_id, building_id, lo
:type upgrade_id: int
:param building_id: building id from buildstock.csv
:type building_id: int
:param low_disk: If true, remove the simulation directory entirely to save disk space
:type low_disk: bool
:param low_disk: If "low_disk", remove the simulation directory entirely to save disk space.
If "ultra_low_disk_no_timeseries", remove the simulation directory entirely to
save disk space and also delete the timeseries parquet file.
:type low_disk: str
:param skip_write: If True, skip writing the timeseries parquet file to dest_fs. Return only.
"""

# Convert the timeseries data to parquet
# and copy it to the results directory
# and copy it to the results directory if skip_write is False
output_dir = os.path.join(sim_dir, "run")
timeseries_filepath = os.path.join(output_dir, "results_timeseries.csv")
tsdf = None
# FIXME: Allowing both names here for compatibility. Should consolidate on one timeseries filename.
if os.path.isfile(timeseries_filepath):
units_dict = read_csv(timeseries_filepath, nrows=1).transpose().to_dict()[0]
Expand Down Expand Up @@ -255,15 +269,23 @@ def get_clean_column_name(x):
return x.lower()

tsdf.rename(columns=get_clean_column_name, inplace=True)
postprocessing.write_dataframe_as_parquet(
tsdf,
dest_fs,
f"{simout_ts_dir}/up{upgrade_id:02d}/bldg{building_id:07d}.parquet",
)
tsdf["building_id"] = building_id
if not skip_write:
pathlib.Path(simout_ts_dir).mkdir(exist_ok=True, parents=True)
postprocessing.write_dataframe_as_parquet(
tsdf,
dest_fs,
f"{simout_ts_dir}/{building_id}-{upgrade_id}.parquet",
)

if low_disk:
shutil.rmtree(sim_dir, ignore_errors=True)
return
if (
low_disk == "ultra_low_disk_no_timeseries"
): # only delete after writing to allow testing of writing workflow
if os.path.exists(f"{simout_ts_dir}/up{upgrade_id:02d}/bldg{building_id:07d}.parquet"):
os.remove(f"{simout_ts_dir}/up{upgrade_id:02d}/bldg{building_id:07d}.parquet")
return tsdf

# Remove files already in data_point.zip
zipfilename = os.path.join(sim_dir, "run", "data_point.zip")
Expand All @@ -281,6 +303,7 @@ def get_clean_column_name(x):
reports_dir = os.path.join(sim_dir, "reports")
if os.path.isdir(reports_dir):
shutil.rmtree(reports_dir, ignore_errors=True)
return tsdf

@classmethod
def validate_project(cls, project_file):
Expand Down Expand Up @@ -958,7 +981,7 @@ def process_results(self, skip_combine=False, use_dask_cluster=True, continue_up

fs = self.get_fs()
if not skip_combine:
postprocessing.combine_results(fs, self.results_dir, self.cfg, do_timeseries=do_timeseries)
postprocessing.combine_results(fs, self.results_dir, self.cfg)

aws_conf = self.cfg.get("postprocessing", {}).get("aws", {})
if "s3" in aws_conf or "aws" in self.cfg:
Expand All @@ -980,6 +1003,3 @@ def process_results(self, skip_combine=False, use_dask_cluster=True, continue_up
finally:
if use_dask_cluster:
self.cleanup_dask()

keep_individual_timeseries = self.cfg.get("postprocessing", {}).get("keep_individual_timeseries", False)
postprocessing.remove_intermediate_files(fs, self.results_dir, keep_individual_timeseries)
97 changes: 77 additions & 20 deletions buildstockbatch/hpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tempfile
import time
import csv
import polars as pl

from buildstockbatch.base import BuildStockBatchBase, SimulationExists
from buildstockbatch.utils import (
Expand All @@ -42,10 +43,12 @@
get_project_configuration,
read_csv,
get_bool_env_var,
get_data_dict_annual_ts_schema,
)
from buildstockbatch import postprocessing
from buildstockbatch.__version__ import __version__ as bsb_version
from buildstockbatch.exc import ValidationError
from buildstockbatch.streaming_parquet_writer import StreamingParquetWriters

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -130,10 +133,10 @@ def weather_dir(self):

def run_batch(self, sampling_only=False):
# Create simulation_output dir
sim_out_ts_dir = pathlib.Path(self.output_dir) / "results" / "simulation_output" / "timeseries"
os.makedirs(sim_out_ts_dir, exist_ok=True)
for i in range(0, self.num_upgrades + 1):
os.makedirs(sim_out_ts_dir / f"up{i:02d}")
for dir in ["timeseries", "annual", "timeseries_individual"]:
sim_out_dir = pathlib.Path(self.output_dir) / "results" / "simulation_output" / dir
for i in range(0, self.num_upgrades + 1):
(sim_out_dir / f"up{i:02d}").mkdir(parents=True, exist_ok=True)

# create destination_dir and copy housing_characteristics into it
logger.debug("Copying housing characteristics")
Expand Down Expand Up @@ -257,20 +260,58 @@ def run_job_batch(self, job_array_number):
@delayed
def run_building_d(i, upgrade_idx):
try:
return self.run_building(self.output_dir, self.cfg, args["n_datapoints"], i, upgrade_idx)
return self.run_building(
self.output_dir, self.cfg, args["n_datapoints"], i, upgrade_idx, job_array_number
)
except Exception:
with open(traceback_file_path, "a") as f:
txt = get_error_details()
txt = "\n" + "#" * 20 + "\n" + f"Traceback for building{i}\n" + txt
f.write(txt)
del txt
upgrade_id = 0 if upgrade_idx is None else upgrade_idx + 1
return {"building_id": i, "upgrade": upgrade_id}
annual_schema, ts_schema = get_data_dict_annual_ts_schema(self.cfg)
dpout = {"building_id": i, "upgrade": upgrade_id, "job_id": job_array_number}
dp_df = pl.from_dicts([dpout], schema=annual_schema, strict=False)
return (upgrade_id, i, "N/A", dp_df, None)

# Run the simulations, get the data_point_out.json info from each
tick = time.time()
with Parallel(n_jobs=-1, verbose=9) as parallel:
dpouts = parallel(itertools.starmap(run_building_d, args["batch"]))
parallel = Parallel(
n_jobs=-1,
verbose=9,
backend="threading",
return_as="generator_unordered",
)
results_generator = parallel(itertools.starmap(run_building_d, args["batch"]))

annual_schema, ts_schema = get_data_dict_annual_ts_schema(self.cfg)
baseline_writers = StreamingParquetWriters(
base_path=pathlib.Path(self.output_dir) / "results" / "simulation_output" / "annual",
number_of_dataframes_per_file=567000,
base_name="job{job_array_number}",
batch_size=50000,
polars_schema=annual_schema,
)
ts_writers = StreamingParquetWriters(
base_path=pathlib.Path(self.output_dir) / "results" / "parquet" / "timeseries",
number_of_dataframes_per_file=20,
base_name="job{job_array_number}",
batch_size=1,
polars_schema=ts_schema,
)
completed_jobs = []
for result in results_generator:
if result is None:
continue
upgrade_id, i, state, dp_df, ts_df = result
completed_jobs.append((upgrade_id, i))
if dp_df is not None:
baseline_writers.write(f"upgrade={upgrade_id}/", dp_df)
if ts_df is not None:
ts_writers.write(f"upgrade={upgrade_id}/state={state}", ts_df)
baseline_writers.close_all()
ts_writers.close_all()
tick = time.time() - tick
logger.info("Simulation time: {:.2f} minutes".format(tick / 60.0))

Expand Down Expand Up @@ -342,10 +383,12 @@ def run_building_d(i, upgrade_idx):
self.local_apptainer_img.unlink(missing_ok=True)

@classmethod
def run_building(cls, output_dir, cfg, n_datapoints, i, upgrade_idx=None):
def run_building(cls, output_dir, cfg, n_datapoints, i, upgrade_idx=None, job_array_number=0):
fs = LocalFileSystem()
upgrade_id = 0 if upgrade_idx is None else upgrade_idx + 1

state = "N/A"
dp_df = None
ts_df = None
try:
sim_id, sim_dir = cls.make_sim_dir(i, upgrade_idx, os.path.join(cls.local_output_dir, "simulation_output"))
except SimulationExists as ex:
Expand Down Expand Up @@ -461,18 +504,31 @@ def run_building(cls, output_dir, cfg, n_datapoints, i, upgrade_idx=None):
except FileNotFoundError:
pass

# Clean up simulation directory
cls.cleanup_sim_dir(
sim_dir,
fs,
f"{output_dir}/results/simulation_output/timeseries",
upgrade_id,
i,
)

reporting_measures = cls.get_reporting_measures(cfg)
dpout = postprocessing.read_simulation_outputs(fs, reporting_measures, sim_dir, upgrade_id, i)
return dpout
state = dpout.get("BuildExistingModel.state", "N/A")
pd_ts_df = cls.get_timeseries_df(
sim_dir,
fs,
f"{output_dir}/results/simulation_output/timeseries_individual_buildings/by_state/upgrade={upgrade_id}/state={state}",
upgrade_id,
i,
skip_write=False,
)
dpout = {postprocessing.to_camelcase(key): value for key, value in dpout.items()}
dpout["job_id"] = 0 # Used by downstream code. For local run, job_id is always zero.
annual_schema, ts_schema = get_data_dict_annual_ts_schema(cfg)
dp_df = pl.from_dicts([dpout], schema=annual_schema, strict=False)
if pd_ts_df is not None:
sch_overrides = {col: dtype for col, dtype in ts_schema.items() if col in set(pd_ts_df.columns)}
ts_df = pl.from_pandas(pd_ts_df, schema_overrides=sch_overrides)
missing_cols = set(ts_schema.keys()) - set(ts_df.columns)
available_cols = [col for col in ts_schema.keys() if col in set(ts_df.columns)]
ts_df = ts_df.select(available_cols).with_columns(
[pl.lit(None).cast(dtype).alias(col) for col, dtype in ts_schema.items() if col in missing_cols]
)
ts_df = ts_df.select(ts_schema.keys())
return (upgrade_id, i, state, dp_df, ts_df)

@staticmethod
def _queue_jobs_env_vars() -> dict:
Expand Down Expand Up @@ -653,6 +709,7 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False,
"--output=postprocessing.out",
"--nodes=1",
":",
"--partition=nvme",
"--tmp=1000000",
"--mem={}".format(memory),
"--output=dask_workers.out",
Expand Down
Loading
Loading