Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c8c3f43
Initial test
rnichol1 Mar 16, 2026
76e8c73
Copy paste error
rnichol1 Mar 16, 2026
0402abb
Merge branch 'qat' into mod/FIR-36705
boozallendanny Mar 16, 2026
c11524f
delete bad chars
boozallendanny Mar 16, 2026
64cd884
imports and syntax
rnichol1 Mar 16, 2026
4b8551a
syntax
rnichol1 Mar 17, 2026
6fc427d
syntax?
rnichol1 Mar 17, 2026
73ef119
syntax
rnichol1 Mar 17, 2026
1619965
syntax
rnichol1 Mar 17, 2026
6d69d21
syntax
rnichol1 Mar 17, 2026
f91c469
syntax
rnichol1 Mar 17, 2026
74d98fd
[DEV-14767] Updating IIQ User Guide Links in Permissions
dpb-bah Mar 18, 2026
0ca59d2
add assisstance publish to backend repo
boozallendanny Mar 18, 2026
f6863a0
usgs extract into broker rpo
boozallendanny Mar 18, 2026
6b9ee9f
add file name change script
boozallendanny Mar 19, 2026
8350a81
Ignoring 0-value rows in file B for A30.2
alburde1 Mar 19, 2026
92eaac3
public files bucket for fapc
boozallendanny Mar 20, 2026
a05faa1
[FIR-36705] Cleaning up per checks
dpb-bah Mar 20, 2026
ba53aa2
Updating A33 with exception TAS
alburde1 Mar 23, 2026
393dcc8
Updating rule B21 to include the TAS exception
alburde1 Mar 23, 2026
a20fa6d
Merge pull request #2856 from fedspendingtransparency/mod/dev-14767-i…
alburde1 Mar 23, 2026
77c787a
black
alburde1 Mar 23, 2026
87a952b
update load assistance listing
boozallendanny Mar 24, 2026
b1bef9f
update bucket ref
boozallendanny Mar 25, 2026
4d106be
internal read zips and internal boc
boozallendanny Mar 25, 2026
dd9c99c
logging
boozallendanny Mar 25, 2026
1e11767
Merge branch 'qat' into mod/dev-14803-a30-ignore-zero-rows
dpb-bah Mar 26, 2026
2380c2a
Merge pull request #2858 from fedspendingtransparency/mod/dev-14803-a…
dpb-bah Mar 26, 2026
db414d5
Merge branch 'qat' into mod/dev-14805-tas-exception-sql
alburde1 Mar 26, 2026
fa334f5
Merge pull request #2859 from fedspendingtransparency/mod/dev-14805-t…
dpb-bah Mar 26, 2026
548880e
cleanup
boozallendanny Mar 27, 2026
089645c
cleanu
boozallendanny Mar 27, 2026
efb8e6f
[FIR-36705] Cleaning up to minimize duplication and additional changes
dpb-bah Mar 27, 2026
8979d94
[FIR-36705] Flipping signed <> fapc
dpb-bah Mar 27, 2026
269f7ce
[FIR-36705] Additional cleanup to minimize additional changes
dpb-bah Mar 27, 2026
ddc358c
Merge branch 'qat' into mod/FIR-36705
dpb-bah Mar 31, 2026
fc3830e
Merge pull request #2857 from fedspendingtransparency/mod/FIR-36705
dpb-bah Mar 31, 2026
0b9232d
Merge pull request #2860 from fedspendingtransparency/qat
dpb-bah Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions dataactbroker/helpers/script_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,15 @@ def _flatten(list_item, name=""):
return out


def get_prefixed_file_list(file_path, aws_prefix, bucket_name="sf_133_bucket", file_extension="csv"):
def get_prefixed_file_list(file_path, aws_prefix, bucket_name="sf_133_bucket", file_extension="csv", signed=True):
"""Get a list of files starting with the given prefix

Args:
file_path: path to where files are stored
aws_prefix: prefix to filter which files to pull from AWS
bucket_name: name of the bucket from which to pull the files
file_extension: the extension of the files to look for
signed: whether to pull presigned S3 urls or not

Returns:
A list of tuples containing information about existing
Expand All @@ -285,13 +286,24 @@ def get_prefixed_file_list(file_path, aws_prefix, bucket_name="sf_133_bucket", f
if CONFIG_BROKER["use_aws"]:
# get list of prefixed files in the config bucket on S3
s3_client = boto3.client("s3", region_name=CONFIG_BROKER["aws_region"])
response = s3_client.list_objects_v2(Bucket=CONFIG_BROKER[bucket_name], Prefix=aws_prefix)
file_list = []
for obj in response.get("Contents", []):
file_url = s3_client.generate_presigned_url(
"get_object", {"Bucket": CONFIG_BROKER[bucket_name], "Key": obj["Key"]}, ExpiresIn=600
)
file_list.append(FileInfo(file_url, obj["Key"]))

if signed:
response = s3_client.list_objects_v2(Bucket=CONFIG_BROKER[bucket_name], Prefix=aws_prefix)
file_list = []
for obj in response.get("Contents", []):
file_url = s3_client.generate_presigned_url(
"get_object", {"Bucket": CONFIG_BROKER[bucket_name], "Key": obj["Key"]}, ExpiresIn=600
)
file_list.append(FileInfo(file_url, obj["Key"]))
else:
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=CONFIG_BROKER[bucket_name], Prefix=aws_prefix)
file_list = []
for page in pages:
for obj in page.get("Contents", []):
key = obj["Key"]
s3_object = s3_client.get_object(Bucket=CONFIG_BROKER[bucket_name], Key=key)
file_list.append(FileInfo(s3_object, key))
else:
file_list = []

Expand Down
1 change: 1 addition & 0 deletions dataactcore/broker_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def deep_merge(left, right):
}


# TODO: Not logging correctly in either remote environment
def configure_logging(service_name="broker"):
config = DEFAULT_CONFIG
if "python_config" in CONFIG_LOGGING:
Expand Down
90 changes: 90 additions & 0 deletions dataactcore/scripts/pipeline/assistance_listing_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import boto3
from botocore.handlers import disable_signing
from datetime import datetime, timedelta
from botocore.exceptions import ClientError
import filecmp
import json
import argparse
import logging

logging.basicConfig(format="%(asctime)s %(levelname)-8s %(message)s", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger()


# Purpose of this script is to check the 'falextracts' AssistanceListing source against our current version, and update
# if necessary. The AssistanceListing source is updated on Saturdays using the ASSISTANCE_LISTING_FILE_FORMAT
# as the path (we do not have access to List anything in the bucket, so the script will break if the source file
# format changes.)

ASSISTANCE_LISTING_FILE_FORMAT = (
"Assistance Listings/usaspendinggov/%Y/%m-%b/AssistanceListings_USASpendingGov_PUBLIC_WEEKLY_%Y%m%d.csv"
)


def get_parser():
parser = argparse.ArgumentParser(description="Get NationalFedCodes file from USGS")
parser.add_argument("--bucket", "-b", type=str, required=True, help="public bucket to download from/upload to")
return parser


if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()

script_output = {}

s3_reso = boto3.resource("s3", region_name="us-east-1")
public_files_bucket_name = args.bucket
fal_extracts_bucket_name = "falextracts"

# Ignore access keys...
s3_reso.meta.client.meta.events.register("choose-signer.s3.*", disable_signing)
falextracts_bucket = s3_reso.Bucket(fal_extracts_bucket_name)

# Calculate object name to pull from falextracts
today = datetime.now()

if today.strftime("%A") == "Saturday":
# Check if today's file exists yet, if not, try last week's...
al_s3_key = today.strftime(ASSISTANCE_LISTING_FILE_FORMAT)
try:
falextracts_bucket.Object(al_s3_key).load()
except ClientError:
logger.info("File at '{}'' not found. Trying last week's file...".format(al_s3_key))
al_s3_key = (today - timedelta(days=7)).strftime(ASSISTANCE_LISTING_FILE_FORMAT)

else:
# Get "last Saturday" by modulo
al_s3_key = (today - timedelta(days=((today.isoweekday() + 1) % 7))).strftime(ASSISTANCE_LISTING_FILE_FORMAT)

logger.info("Attempting to download falextracts AssistanceListing file '{}'...".format(al_s3_key))
falextracts_bucket.download_file(al_s3_key, "falextracts_cfda.csv")
script_output["source_file_path"] = al_s3_key
script_output["source_file_date"] = str(falextracts_bucket.Object(al_s3_key).last_modified)

logger.info("Downloading current USAspending-published AssistanceListing file...")
usaspending_s3_reso = boto3.resource("s3", region_name="us-gov-west-1")
usaspending_s3_reso.Bucket(public_files_bucket_name).download_file(
"broker_reference_data/assistance_listing.csv", "usaspending_assistance_listing.csv"
)

script_output["usaspending_file_date"] = str(
usaspending_s3_reso.Bucket(public_files_bucket_name)
.Object("broker_reference_data/assistance_listing.csv")
.last_modified
)

if filecmp.cmp("falextracts_cfda.csv", "usaspending_assistance_listing.csv"):
logger.info("Current USAspending-published version matches falextracts version. No copy needed.")
script_output["published_new_version"] = False
else:
logger.info('New file at {} is being uploaded to "da-public-files"...'.format(al_s3_key))
usaspending_s3_reso.meta.client.upload_file(
"falextracts_cfda.csv", public_files_bucket_name, "broker_reference_data/assistance_listing.csv"
)
logger.info("AssistanceListing file uploaded successfully.")
script_output["published_new_version"] = True

with open("assistance_listing_publish_metrics.json", "w+") as json_out:
logger.info("Writing to assistance_listing_publish_metrics.json")
json.dump(script_output, json_out)
104 changes: 104 additions & 0 deletions dataactcore/scripts/pipeline/extract_usgs_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
import logging
import re
import boto3
import urllib.request
import zipfile
import datetime
import argparse
import shutil

logging.basicConfig(format="%(asctime)s %(levelname)-8s %(message)s", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S")

FED_CODES_URL = (
"https://prd-tnm.s3.amazonaws.com/StagedProducts/GeographicNames/FederalCodes/FedCodes_National_Text.zip"
)
GOV_UNITS_URL = (
"https://prd-tnm.s3.amazonaws.com/StagedProducts/GeographicNames/Topical/GovernmentUnits_National_Text.zip"
)
FED_CODES_FILENAME = "FederalCodes_National"
GOV_UNITS_FILENAME = "GovernmentUnits_National"


def download_and_extract_file(file_dir, url, zip_name, file_name):
"""Download the file from the url and extract the txt from the zip

Args:
file_dir: string indicating the directory the file is stored in
url: url to pull from
zip_name: name of zip to unzip
file_name: name of the file to be pulled

Returns:
path to extracted file
"""
file_path = None
subfolder_path = os.path.join(file_dir, "Text")

logging.info("Downloading {}".format(zip_name))
if not os.path.exists(file_dir):
os.makedirs(file_dir)
zip_path = os.path.join(file_dir, zip_name)
urllib.request.urlretrieve(url, zip_path)

logging.info("Extracting zip contents")
with zipfile.ZipFile(zip_path, "r") as zip_file:
zip_file.extractall(file_dir)
os.remove(zip_path)

# Find the file we just downloaded, can't have any other versions of the file in the folder for this to work
logging.info("Finding {}".format(file_name))
for dir_file in os.listdir(subfolder_path):
if re.match(file_name, dir_file):
file_path = os.path.join(subfolder_path, file_name)

return file_path


def get_parser():
parser = argparse.ArgumentParser(description="Get NationalFedCodes file from USGS")
parser.add_argument("--bucket", type=str, default=None, help="S3 bucket to upload it to")
parser.add_argument("--fed-codes", action="store_true", default=False, help="Extract the fed codes file")
parser.add_argument("--gov-units", action="store_true", default=False, help="Extract the gov units file")
return parser


if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
bucket = args.bucket
fed_codes = args.fed_codes
gov_units = args.gov_units

files = {}
if fed_codes:
files[FED_CODES_URL] = FED_CODES_FILENAME
if gov_units:
files[GOV_UNITS_URL] = GOV_UNITS_FILENAME
if not files:
logging.error("Please specify --fed-codes and/or --gov-units")
exit()

start_time = datetime.datetime.now()
logging.info("Starting USGS file retrieval.")

temp_extract_dir = "temp_extract_dir"
for url, filename in files.items():
base_name = url[url.rfind("/") + 1 : url.rfind(".")]
file_name = "{}.txt".format(filename)
file_path = download_and_extract_file(temp_extract_dir, url, "{}.zip".format(base_name), file_name)

if file_path:
if bucket:
logging.info("Uploading {} to S3.".format(file_name))
s3_resource = boto3.resource("s3", region_name="us-gov-west-1")
s3_resource.Object(bucket, "broker_reference_data/{}".format(file_name)).put(Body=open(file_path, "rb"))
os.remove(file_path)
else:
os.rename(file_path, file_name)
else:
logging.info("{} not found.".format(file_name))

shutil.rmtree(temp_extract_dir)

logging.info("USGS file retrieval completed in {} seconds.".format(datetime.datetime.now() - start_time))
17 changes: 14 additions & 3 deletions dataactcore/scripts/pipeline/load_assistance_listing_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import requests
import pandas as pd
import boto3
import time
import sys
from datetime import datetime
Expand Down Expand Up @@ -75,11 +76,21 @@ def load_assistance_listing(base_path, load_local=False, local_file_name="assist
"""
local_now = datetime.now()
if not load_local:
logger.info("Fetching Assistance Listing file from {}".format(S3_ASSISTANCE_LISTING_FILE))
tmp_name = str(time.time()).replace(".", "") + "_assistance_listing.csv"
filename = os.path.join(base_path, tmp_name)
r = requests.get(S3_ASSISTANCE_LISTING_FILE, allow_redirects=True)
open(filename, "wb").write(r.content)

fapc = os.environ.get("fapc", "false") == "true"
if fapc:
s3 = boto3.client("s3")
s3.download_file(
Bucket=CONFIG_BROKER["public_files_bucket"],
Key="broker_reference_data/assistance_listing.csv",
Filename=filename,
)
else:
logger.info("Fetching Assistance Listing file from {}".format(S3_ASSISTANCE_LISTING_FILE))
r = requests.get(S3_ASSISTANCE_LISTING_FILE, allow_redirects=True)
open(filename, "wb").write(r.content)
else:
filename = os.path.join(base_path, local_file_name)
logger.info("Loading assistance listing file: " + filename)
Expand Down
4 changes: 2 additions & 2 deletions dataactcore/scripts/pipeline/load_gtas_boc.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def load_all_boc(boc_path=None, force_load=False, aws_prefix="OMB_Extract_BOC"):

with create_app().app_context():
sess = GlobalDB.db().session

boc_list = get_prefixed_file_list(boc_path, aws_prefix, file_extension="csv")
fapc = os.environ.get("fapc", "false") == "true"
boc_list = get_prefixed_file_list(boc_path, aws_prefix, file_extension="csv", signed=not fapc)
boc_re = re.compile(r"OMB_Extract_BOC_(?P<year>\d{4})_(?P<period>\d{2})\.csv")
for boc in boc_list:
# for each BOC file, parse out fiscal year and period and call the BOC loader
Expand Down
3 changes: 2 additions & 1 deletion dataactcore/scripts/pipeline/load_sf133.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def load_all_sf133(
sess = GlobalDB.db().session

# get a list of SF 133 files to load
sf133_list = get_prefixed_file_list(sf133_path, aws_prefix)
fapc = os.environ.get("fapc", "false") == "true"
sf133_list = get_prefixed_file_list(sf133_path, aws_prefix, signed=not fapc)
sf_re = re.compile(r"sf_133_(?P<year>\d{4})_(?P<period>\d{2})\.csv")
for sf133 in sf133_list:
# for each SF file, parse out fiscal year and period and call the SF 133 loader
Expand Down
Loading
Loading