Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions dataactbroker/helpers/script_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,15 @@ def _flatten(list_item, name=""):
return out


def get_prefixed_file_list(file_path, aws_prefix, bucket_name="sf_133_bucket", file_extension="csv"):
def get_prefixed_file_list(file_path, aws_prefix, bucket_name="sf_133_bucket", file_extension="csv", signed=True):
"""Get a list of files starting with the given prefix

Args:
file_path: path to where files are stored
aws_prefix: prefix to filter which files to pull from AWS
bucket_name: name of the bucket from which to pull the files
file_extension: the extension of the files to look for
signed: whether to pull presigned S3 urls or not

Returns:
A list of tuples containing information about existing
Expand All @@ -285,13 +286,24 @@ def get_prefixed_file_list(file_path, aws_prefix, bucket_name="sf_133_bucket", f
if CONFIG_BROKER["use_aws"]:
# get list of prefixed files in the config bucket on S3
s3_client = boto3.client("s3", region_name=CONFIG_BROKER["aws_region"])
response = s3_client.list_objects_v2(Bucket=CONFIG_BROKER[bucket_name], Prefix=aws_prefix)
file_list = []
for obj in response.get("Contents", []):
file_url = s3_client.generate_presigned_url(
"get_object", {"Bucket": CONFIG_BROKER[bucket_name], "Key": obj["Key"]}, ExpiresIn=600
)
file_list.append(FileInfo(file_url, obj["Key"]))

if signed:
response = s3_client.list_objects_v2(Bucket=CONFIG_BROKER[bucket_name], Prefix=aws_prefix)
file_list = []
for obj in response.get("Contents", []):
file_url = s3_client.generate_presigned_url(
"get_object", {"Bucket": CONFIG_BROKER[bucket_name], "Key": obj["Key"]}, ExpiresIn=600
)
file_list.append(FileInfo(file_url, obj["Key"]))
else:
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=CONFIG_BROKER[bucket_name], Prefix=aws_prefix)
file_list = []
for page in pages:
for obj in page.get("Contents", []):
key = obj["Key"]
s3_object = s3_client.get_object(Bucket=CONFIG_BROKER[bucket_name], Key=key)
file_list.append(FileInfo(s3_object, key))
else:
file_list = []

Expand Down
1 change: 1 addition & 0 deletions dataactcore/broker_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def deep_merge(left, right):
}


# TODO: Not logging correctly in either remote environment
def configure_logging(service_name="broker"):
config = DEFAULT_CONFIG
if "python_config" in CONFIG_LOGGING:
Expand Down
90 changes: 90 additions & 0 deletions dataactcore/scripts/pipeline/assistance_listing_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import boto3
from botocore.handlers import disable_signing
from datetime import datetime, timedelta
from botocore.exceptions import ClientError
import filecmp
import json
import argparse
import logging

logging.basicConfig(format="%(asctime)s %(levelname)-8s %(message)s", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger()


# Purpose of this script is to check the 'falextracts' AssistanceListing source against our current version, and update
# if necessary. The AssistanceListing source is updated on Saturdays using the ASSISTANCE_LISTING_FILE_FORMAT
# as the path (we do not have access to List anything in the bucket, so the script will break if the source file
# format changes.)

ASSISTANCE_LISTING_FILE_FORMAT = (
"Assistance Listings/usaspendinggov/%Y/%m-%b/AssistanceListings_USASpendingGov_PUBLIC_WEEKLY_%Y%m%d.csv"
)


def get_parser():
parser = argparse.ArgumentParser(description="Get NationalFedCodes file from USGS")
parser.add_argument("--bucket", "-b", type=str, required=True, help="public bucket to download from/upload to")
return parser


if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()

script_output = {}

s3_reso = boto3.resource("s3", region_name="us-east-1")
public_files_bucket_name = args.bucket
fal_extracts_bucket_name = "falextracts"

# Ignore access keys...
s3_reso.meta.client.meta.events.register("choose-signer.s3.*", disable_signing)
falextracts_bucket = s3_reso.Bucket(fal_extracts_bucket_name)

# Calculate object name to pull from falextracts
today = datetime.now()

if today.strftime("%A") == "Saturday":
# Check if today's file exists yet, if not, try last week's...
al_s3_key = today.strftime(ASSISTANCE_LISTING_FILE_FORMAT)
try:
falextracts_bucket.Object(al_s3_key).load()
except ClientError:
logger.info("File at '{}'' not found. Trying last week's file...".format(al_s3_key))
al_s3_key = (today - timedelta(days=7)).strftime(ASSISTANCE_LISTING_FILE_FORMAT)

else:
# Get "last Saturday" by modulo
al_s3_key = (today - timedelta(days=((today.isoweekday() + 1) % 7))).strftime(ASSISTANCE_LISTING_FILE_FORMAT)

logger.info("Attempting to download falextracts AssistanceListing file '{}'...".format(al_s3_key))
falextracts_bucket.download_file(al_s3_key, "falextracts_cfda.csv")
script_output["source_file_path"] = al_s3_key
script_output["source_file_date"] = str(falextracts_bucket.Object(al_s3_key).last_modified)

logger.info("Downloading current USAspending-published AssistanceListing file...")
usaspending_s3_reso = boto3.resource("s3", region_name="us-gov-west-1")
usaspending_s3_reso.Bucket(public_files_bucket_name).download_file(
"broker_reference_data/assistance_listing.csv", "usaspending_assistance_listing.csv"
)

script_output["usaspending_file_date"] = str(
usaspending_s3_reso.Bucket(public_files_bucket_name)
.Object("broker_reference_data/assistance_listing.csv")
.last_modified
)

if filecmp.cmp("falextracts_cfda.csv", "usaspending_assistance_listing.csv"):
logger.info("Current USAspending-published version matches falextracts version. No copy needed.")
script_output["published_new_version"] = False
else:
logger.info('New file at {} is being uploaded to "da-public-files"...'.format(al_s3_key))
usaspending_s3_reso.meta.client.upload_file(
"falextracts_cfda.csv", public_files_bucket_name, "broker_reference_data/assistance_listing.csv"
)
logger.info("AssistanceListing file uploaded successfully.")
script_output["published_new_version"] = True

with open("assistance_listing_publish_metrics.json", "w+") as json_out:
logger.info("Writing to assistance_listing_publish_metrics.json")
json.dump(script_output, json_out)
104 changes: 104 additions & 0 deletions dataactcore/scripts/pipeline/extract_usgs_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
import logging
import re
import boto3
import urllib.request
import zipfile
import datetime
import argparse
import shutil

logging.basicConfig(format="%(asctime)s %(levelname)-8s %(message)s", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S")

FED_CODES_URL = (
"https://prd-tnm.s3.amazonaws.com/StagedProducts/GeographicNames/FederalCodes/FedCodes_National_Text.zip"
)
GOV_UNITS_URL = (
"https://prd-tnm.s3.amazonaws.com/StagedProducts/GeographicNames/Topical/GovernmentUnits_National_Text.zip"
)
FED_CODES_FILENAME = "FederalCodes_National"
GOV_UNITS_FILENAME = "GovernmentUnits_National"


def download_and_extract_file(file_dir, url, zip_name, file_name):
"""Download the file from the url and extract the txt from the zip

Args:
file_dir: string indicating the directory the file is stored in
url: url to pull from
zip_name: name of zip to unzip
file_name: name of the file to be pulled

Returns:
path to extracted file
"""
file_path = None
subfolder_path = os.path.join(file_dir, "Text")

logging.info("Downloading {}".format(zip_name))
if not os.path.exists(file_dir):
os.makedirs(file_dir)
zip_path = os.path.join(file_dir, zip_name)
urllib.request.urlretrieve(url, zip_path)

logging.info("Extracting zip contents")
with zipfile.ZipFile(zip_path, "r") as zip_file:
zip_file.extractall(file_dir)
os.remove(zip_path)

# Find the file we just downloaded, can't have any other versions of the file in the folder for this to work
logging.info("Finding {}".format(file_name))
for dir_file in os.listdir(subfolder_path):
if re.match(file_name, dir_file):
file_path = os.path.join(subfolder_path, file_name)

return file_path


def get_parser():
parser = argparse.ArgumentParser(description="Get NationalFedCodes file from USGS")
parser.add_argument("--bucket", type=str, default=None, help="S3 bucket to upload it to")
parser.add_argument("--fed-codes", action="store_true", default=False, help="Extract the fed codes file")
parser.add_argument("--gov-units", action="store_true", default=False, help="Extract the gov units file")
return parser


if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
bucket = args.bucket
fed_codes = args.fed_codes
gov_units = args.gov_units

files = {}
if fed_codes:
files[FED_CODES_URL] = FED_CODES_FILENAME
if gov_units:
files[GOV_UNITS_URL] = GOV_UNITS_FILENAME
if not files:
logging.error("Please specify --fed-codes and/or --gov-units")
exit()

start_time = datetime.datetime.now()
logging.info("Starting USGS file retrieval.")

temp_extract_dir = "temp_extract_dir"
for url, filename in files.items():
base_name = url[url.rfind("/") + 1 : url.rfind(".")]
file_name = "{}.txt".format(filename)
file_path = download_and_extract_file(temp_extract_dir, url, "{}.zip".format(base_name), file_name)

if file_path:
if bucket:
logging.info("Uploading {} to S3.".format(file_name))
s3_resource = boto3.resource("s3", region_name="us-gov-west-1")
s3_resource.Object(bucket, "broker_reference_data/{}".format(file_name)).put(Body=open(file_path, "rb"))
os.remove(file_path)
else:
os.rename(file_path, file_name)
else:
logging.info("{} not found.".format(file_name))

shutil.rmtree(temp_extract_dir)

logging.info("USGS file retrieval completed in {} seconds.".format(datetime.datetime.now() - start_time))
17 changes: 14 additions & 3 deletions dataactcore/scripts/pipeline/load_assistance_listing_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import requests
import pandas as pd
import boto3
import time
import sys
from datetime import datetime
Expand Down Expand Up @@ -75,11 +76,21 @@ def load_assistance_listing(base_path, load_local=False, local_file_name="assist
"""
local_now = datetime.now()
if not load_local:
logger.info("Fetching Assistance Listing file from {}".format(S3_ASSISTANCE_LISTING_FILE))
tmp_name = str(time.time()).replace(".", "") + "_assistance_listing.csv"
filename = os.path.join(base_path, tmp_name)
r = requests.get(S3_ASSISTANCE_LISTING_FILE, allow_redirects=True)
open(filename, "wb").write(r.content)

fapc = os.environ.get("fapc", "false") == "true"
if fapc:
s3 = boto3.client("s3")
s3.download_file(
Bucket=CONFIG_BROKER["public_files_bucket"],
Key="broker_reference_data/assistance_listing.csv",
Filename=filename,
)
else:
logger.info("Fetching Assistance Listing file from {}".format(S3_ASSISTANCE_LISTING_FILE))
r = requests.get(S3_ASSISTANCE_LISTING_FILE, allow_redirects=True)
open(filename, "wb").write(r.content)
else:
filename = os.path.join(base_path, local_file_name)
logger.info("Loading assistance listing file: " + filename)
Expand Down
4 changes: 2 additions & 2 deletions dataactcore/scripts/pipeline/load_gtas_boc.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def load_all_boc(boc_path=None, force_load=False, aws_prefix="OMB_Extract_BOC"):

with create_app().app_context():
sess = GlobalDB.db().session

boc_list = get_prefixed_file_list(boc_path, aws_prefix, file_extension="csv")
fapc = os.environ.get("fapc", "false") == "true"
boc_list = get_prefixed_file_list(boc_path, aws_prefix, file_extension="csv", signed=not fapc)
boc_re = re.compile(r"OMB_Extract_BOC_(?P<year>\d{4})_(?P<period>\d{2})\.csv")
for boc in boc_list:
# for each BOC file, parse out fiscal year and period and call the BOC loader
Expand Down
3 changes: 2 additions & 1 deletion dataactcore/scripts/pipeline/load_sf133.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def load_all_sf133(
sess = GlobalDB.db().session

# get a list of SF 133 files to load
sf133_list = get_prefixed_file_list(sf133_path, aws_prefix)
fapc = os.environ.get("fapc", "false") == "true"
sf133_list = get_prefixed_file_list(sf133_path, aws_prefix, signed=not fapc)
sf_re = re.compile(r"sf_133_(?P<year>\d{4})_(?P<period>\d{2})\.csv")
for sf133 in sf133_list:
# for each SF file, parse out fiscal year and period and call the SF 133 loader
Expand Down
Loading
Loading