Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Updated the `@cumulus/db/search` module to support searching on nested JSON fields.
- Updated the `@cumulus/db/translate` `translatePostgres*Record*ToApi*Record*` functions to
correctly handle query results from both PostgreSQL and DuckDB.
- **CUMULUS-4428**
- Added add-input-granules task

### Changed

Expand Down
67 changes: 67 additions & 0 deletions tasks/add-input-granules/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# @cumulus/add-input-granules

This task adds a list of granules processed by child ingest executions to its output message. It uses the [Cumulus Python API](https://github.com/nasa/Cumulus-API-Python) to fetch granule information.

## Usage
This task is intended to be used as part of PDR workflows. It is run after all ingest workflows are completed and prior to granule cleanup tasks to which it provides input granules.

### Input

| field name | type | default | required | values | description
| ---------- | ---- | ------- | -------- | ------ | -----------
| pdr | object | N/A | yes | N/A | Product Delivery Record
| pdr.name | string | N/A | yes | N/A | PDR filename
| pdr.path | string | N/A | yes | N/A | PDR location
| pdr.archivePath | string | N/A | yes | N/A | Archived PDR location
| running | array[string] | N/A | yes | N/A | List of queued and running workflow execution ARNS
| completed | array[string] | N/A | yes | N/A | List of completed workflow execution ARNs
| failed | array[object] | N/A | yes | N/A | List of failed workflow ARNs and reason for failure
| failed[].arn | string | N/A | yes | N/A | Failed execution ARN
| failed[].reason | string | N/A | yes | N/A | Reason for workflow failure

### Output

| field name | type | default | required | values | description
| ---------- | ---- | ------- | -------- | ------ | -----------
| pdr | object | N/A | yes | N/A | Product Delivery Record
| pdr.name | string | N/A | yes | N/A | PDR filename
| pdr.path | string | N/A | yes | N/A | PDR location
| pdr.archivePath | string | N/A | yes | N/A | Archived PDR location
| running | array[string] | N/A | yes | N/A | List of queued and running workflow execution ARNS
| completed | array[string] | N/A | yes | N/A | List of completed workflow execution ARNs
| failed | array[object] | N/A | yes | N/A | List of failed workflow ARNs and reason for failure
| failed[].arn | string | N/A | yes | N/A | Failed execution ARN
| failed[].reason | string | N/A | yes | N/A | Reason for workflow failure
| granules | array[object] | N/A | yes | N/A | List of granules
| granules[].granuleId | string | N/A | yes | N/A | Granule ID
| granules[].files | array[object] | N/A | yes | N/A | List of files associated with granule
| granules[].files[].bucket | string | N/A | yes | N/A | Bucket where file is archived in S3
| granules[].files[].checksum | string | N/A | no | N/A | Checksum value for file
| granules[].files[].fileName | string | N/A | no | N/A | Name of file (e.g. file.txt)
| granules[].files[].key | string | N/A | yes | N/A | S3 Key for archived file
| granules[].files[].size | integer | N/A | no | N/A | Size of file (in bytes)
| granules[].files[].source | string | N/A | no | N/A | Source URI of the file from origin system (e.g. S3, FTP, HTTP)
| granules[].files[].type | string | N/A | no | N/A | Type of file (e.g. data, metadata, browse)

### Example workflow configuration and use


### Internal Dependencies

This task uses the Cumulus Private API Lambda via the Cumulus Python API and requires the `PRIVATE_API_LAMBDA_ARN` environment variable to be set.

### External Dependencies

- https://github.com/nasa/Cumulus-API-Python
- https://github.com/nasa/cumulus-message-adapter
- https://github.com/nasa/cumulus-message-adapter-python

## Contributing

To make a contribution, please [see our Cumulus contributing guidelines](https://github.com/nasa/cumulus/blob/master/CONTRIBUTING.md) and our documentation on [adding a task](https://nasa.github.io/cumulus/docs/adding-a-task)

## About Cumulus

Cumulus is a cloud-based data ingest, archive, distribution and management prototype for NASA's future Earth science data streams.

[Cumulus Documentation](https://nasa.github.io/cumulus)
31 changes: 31 additions & 0 deletions tasks/add-input-granules/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"name": "@cumulus/add-input-granules-task",
"private": true,
"version": "21.3.1",
"description": "Adds input granules to the output message using execution ARNs",
"main": "index.js",
"homepage": "https://github.com/nasa/cumulus/tree/master/tasks/add-input-granules",
"repository": {
"type": "git",
"url": "https://github.com/nasa/cumulus"
},
"scripts": {
"test": "uv run pytest -q",
"lint": "uv run ruff check . && uv run ruff format --diff . && uv run mypy src",
"clean": "rm -rf dist && rm -rf .venv && mkdir -p dist/packages",
"build": "uv sync",
"prepare": "npm run build",
"package": "npm run clean && cp -r ./schemas ./dist/packages && ../../bin/package_uv.sh 3.12 ${PWD}",
"install-python-deps": "npm run build"
},
"publishConfig": {
"access": "restricted"
},
"nyc": {
"exclude": [
"tests"
]
},
"author": "Cumulus Authors",
"license": "Apache-2.0"
}
29 changes: 29 additions & 0 deletions tasks/add-input-granules/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[project]
name = "add-input-granules"
version = "21.3.1"
description = "Adds input granules to the output message using execution ARNs"
readme = "README.md"
requires-python = "~=3.12.0"
dependencies = [
"cumulus-api",
"cumulus-message-adapter-python>=2.4.0",
]

[dependency-groups]
dev = [
"mypy~=1.19",
"pytest~=9.0",
"ruff~=0.14.0",
]

[tool.pytest]
minversion = "9.0"
pythonpath = [
"src",
]
testpaths = [
"tests",
]

[tool.uv.sources]
cumulus-api = { git = "https://github.com/nasa/Cumulus-API-Python", tag = "v3.1.0" }
71 changes: 71 additions & 0 deletions tasks/add-input-granules/schemas/input.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{
"title": "AddInputGranulesInput",
"description": "Describes the input expected by the add-input-granules task",
"type": "object",
"required": [
"pdr",
"running",
"completed",
"failed"
],
"properties": {
"pdr": {
"description": "Product Delivery Record",
"type": "object",
"required": [
"name",
"path",
"archivePath"

@adtisdal-ASDC adtisdal-ASDC Mar 4, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to leave archivePath off as required. Since it's not necessary for this task and I'm not sure if the pdr object will always have it, but I don't think it hurts leaving it as required

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was on the fence on whether or not it should be marked as required since I lacked some context on whether subsequent PDR tasks would break if it wasn't present for pass through. I went ahead and removed it as a required field.

],
"properties": {
"name": {
"description": "Filename of the PDR",
"type": "string"
},
"path": {
"description": "Location of the PDR",
"type": "string"
},
"archivePath": {
"description": "Archive path of the PDR",
"type": "string"
}
}
},
"running": {
"description": "List of execution ARNs which are queued or running",
"type": "array",
"items": {
"type": "string"
}
},
"completed": {
"description": "List of completed execution ARNs",
"type": "array",
"items": {
"type": "string"
}
},
"failed": {
"description": "List of failed execution ARNs with reason for failure",
"type": "array",
"items": {
"type": "object",
"required": [
"arn",
"reason"
],
"properties": {
"arn": {
"description": "ARN of the failed execution",
"type": "string"
},
"reason": {
"description": "Reason for the failed execution",
"type": "string"
}
}
}
}
}
}
128 changes: 128 additions & 0 deletions tasks/add-input-granules/schemas/output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
{
"title": "AddInputGranulesOutput",
"description": "Describes the output produced by the add-input-granules task",
"type": "object",
"required": [
"pdr",
"running",
"completed",
"failed",
"granules"
],
"properties": {
"pdr": {
"description": "Product Delivery Record",
"type": "object",
"required": [
"name",
"path"
],
"properties": {
"name": {
"description": "Filename of the PDR",
"type": "string"
},
"path": {
"description": "Location of the PDR",
"type": "string"
}
}
},
"running": {
"description": "List of execution ARNs which are queued or running",
"type": "array",
"items": {
"type": "string"
}
},
"completed": {
"description": "List of completed execution ARNs",
"type": "array",
"items": {
"type": "string"
}
},
"failed": {
"description": "List of failed execution ARNs with reason for failure",
"type": "array",
"items": {
"type": "object",
"required": [
"arn",
"reason"
],
"properties": {
"arn": {
"description": "ARN of the failed execution",
"type": "string"
},
"reason": {
"description": "Reason for the failed execution",
"type": "string"
}
}
}
},
"granules": {
"type": "array",
"description": "List of granules added by the add-input-granules task.",
"items": {
"type": "object",
"required": [
"granuleId",
"files"
],
"properties": {
"granuleId": {
"type": "string"
},
"files": {
"type": "array",
"items": {
"additionalProperties": false,
"type": "object",
"required": [
"bucket",
"key"
],
"properties": {
"bucket": {
"description": "Bucket where file is archived in S3",
"type": "string"
},
"checksum": {
"description": "Checksum value for file",
"type": "string"
},
"checksumType": {
"description": "Type of checksum (e.g. md5, sha256, etc)",
"type": "string"
},
"fileName": {
"description": "Name of file (e.g. file.txt)",
"type": "string"
},
"key": {
"description": "S3 Key for archived file",
"type": "string"
},
"size": {
"description": "Size of file (in bytes)",
"type": "number"
},
"source": {
"description": "Source URI of the file from origin system (e.g. S3, FTP, HTTP)",
"type": "string"
},
"type": {
"description": "Type of file (e.g. data, metadata, browse)",
"type": "string"
}
}
}
}
}
}
}
}
}
54 changes: 54 additions & 0 deletions tasks/add-input-granules/src/add_input_granules/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Task for adding granules cleanup input."""

import json
import logging

from cumulus_api import CumulusApi
from run_cumulus_task import run_cumulus_task

logger = logging.getLogger()
Comment thread
ccollinsuah marked this conversation as resolved.
Outdated
logger.setLevel(logging.INFO)


def _get_granules_from_exc(executions: list) -> list:
"""Get a list of granules from ingest executions using Cumulus API.

:param executions: List of execution ARNs or execution objects.
"""
cml = CumulusApi
Comment thread
ccollinsuah marked this conversation as resolved.
Outdated
input_granules = []
for exc in executions:
arn = exc["arn"] if isinstance(exc, dict) else exc
exc_obj = cml.get_execution(arn)
granules = exc_obj["finalPayload"].get("granules", [])
for granule in granules:
input_granules.append(granule)
logger.info("INPUT GRANULES \n" + json.dumps(input_granules))
return input_granules


def add_input_granules(event: dict, _context: dict) -> dict:
"""Add list of input granules to task output.

:param event: Lambda event object.
:param _context: Lambda context object, unused.
"""
logger.info("## EVENT OBJ \n" + json.dumps(event))

# Executions in running state might not have started yet
if len(event["input"]["running"]) > 0:
raise Exception("Executions still running")
if len(event["input"]["failed"]) > 0:
raise Exception("Some executions failed")

event["input"]["granules"] = _get_granules_from_exc(event["input"]["completed"])
return event["input"]


def handler(event: dict, _context: dict) -> dict:
"""Lambda handler for the task using CMA.

:param event: Lambda event object in Cumulus message format.
:param _context: Lambda context object.
"""
return run_cumulus_task(add_input_granules, event, _context)
Loading