Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/airflow:slim-2.7.0-python3.10
FROM apache/airflow:slim-3.0.2-python3.12

USER root
# `apt-get autoremove` is used to remove packages that were automatically installed to satisfy
Expand All @@ -16,7 +16,7 @@ COPY --chown=airflow:airflow requirements.txt "${AIRFLOW_HOME}/requirements.txt"
USER airflow

RUN pip install --upgrade pip \
&& pip install --no-cache-dir -r requirements.txt -c "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.0/constraints-3.10.txt"
&& pip install --no-cache-dir -r requirements.txt -c "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.2/constraints-3.12.txt"

COPY --chown=airflow:airflow dags "${AIRFLOW_HOME}/dags"
COPY --chown=airflow:airflow plugins "${AIRFLOW_HOME}/plugins"
Expand Down
94 changes: 94 additions & 0 deletions infrastructure/ecs_services/airflow_dag_processor.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
resource "aws_cloudwatch_log_group" "airflow_dag_processor" {
name_prefix = "/${var.prefix}-sm2a/airflow-dag-processor/"
retention_in_days = 1
}

resource "aws_ecs_task_definition" "airflow_dag_processor" {
family = "${var.prefix}-dag-processor"
depends_on = [null_resource.build_ecr_image, aws_ecr_repository.airflow]
cpu = 512
memory = 1024
execution_role_arn = aws_iam_role.ecs_task_execution_role.arn
task_role_arn = aws_iam_role.airflow_task.arn
network_mode = "awsvpc"
runtime_platform {
operating_system_family = "LINUX"
cpu_architecture = var.task_cpu_architecture
}
requires_compatibilities = ["FARGATE"]

container_definitions = jsonencode([
{
name = "dag-processor"
image = join(":", [aws_ecr_repository.airflow.repository_url, "latest"])
cpu = 512
memory = 1024
healthcheck = {
command = [
"CMD-SHELL",
"airflow jobs check --job-type DagProcessorJob --hostname \"$${HOSTNAME}\""
]
interval = 35
timeout = 30
retries = 5
}
essential = true
command = ["dag-processor"]
linuxParameters = {
initProcessEnabled = true
}
environment = concat(var.airflow_task_common_environment,
[
{
name = "SERVICES_HASH"
value = join(",", local.services_hashes)
}
]
)
user = "50000:0"
logConfiguration = {
logDriver = "awslogs"
options = {
awslogs-group = aws_cloudwatch_log_group.airflow_dag_processor.name
awslogs-region = var.aws_region
awslogs-stream-prefix = "airflow-dag-processor"
}
}
}
])
}

resource "aws_security_group" "airflow_dag_processor_service" {
name = "${var.prefix}-dag-processor"
description = "Deny all incoming traffic"
vpc_id = var.vpc_id
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}

resource "aws_ecs_service" "airflow_dag_processor" {
name = "${var.prefix}-dag-processor"
depends_on = [null_resource.build_ecr_image, aws_ecr_repository.airflow]
task_definition = aws_ecs_task_definition.airflow_dag_processor.family
cluster = aws_ecs_cluster.airflow.arn
deployment_controller {
type = "ECS"
}
deployment_maximum_percent = 200
deployment_minimum_healthy_percent = 100
desired_count = 1
enable_execute_command = true
launch_type = "FARGATE"
network_configuration {
subnets = var.private_subnet_ids
assign_public_ip = false
security_groups = [aws_security_group.airflow_dag_processor_service.id]
}
platform_version = "1.4.0"
scheduling_strategy = "REPLICA"
force_new_deployment = var.force_new_ecs_service_deployment
}
8 changes: 4 additions & 4 deletions infrastructure/ecs_services/airflow_server.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ resource "aws_ecs_task_definition" "airflow_webserver" {
requires_compatibilities = ["FARGATE"]
container_definitions = jsonencode([
{
name = "webserver"
name = "api-server"
image = join(":", [aws_ecr_repository.airflow.repository_url, "latest"])
cpu = 1024
memory = 2048
Expand All @@ -36,7 +36,7 @@ resource "aws_ecs_task_definition" "airflow_webserver" {
"CMD",
"curl",
"--fail",
"http://localhost:8080/health"
"http://localhost:8080/api/v2/monitor/health"
]
interval = 35
timeout = 30
Expand All @@ -46,7 +46,7 @@ resource "aws_ecs_task_definition" "airflow_webserver" {
initProcessEnabled = true
}
essential = true
command = ["webserver"]
command = ["api-server"]
environment = concat(var.airflow_task_common_environment,
[
{
Expand Down Expand Up @@ -113,7 +113,7 @@ resource "aws_ecs_service" "airflow_webserver" {
scheduling_strategy = "REPLICA"
load_balancer {
target_group_arn = aws_alb_target_group.ecs-app-target-group.arn # .airflow_webserver.arn
container_name = "webserver"
container_name = "api-server"
container_port = 8080
}
# Update from services folder
Expand Down
94 changes: 94 additions & 0 deletions infrastructure/ecs_services/airflow_triggerer.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
resource "aws_cloudwatch_log_group" "airflow_triggerer" {
name_prefix = "/${var.prefix}-sm2a/airflow-triggerer/"
retention_in_days = 1
}

resource "aws_ecs_task_definition" "airflow_triggerer" {
family = "${var.prefix}-triggerer"
depends_on = [null_resource.build_ecr_image, aws_ecr_repository.airflow]
cpu = var.triggerer_cpu
memory = var.triggerer_memory
execution_role_arn = aws_iam_role.ecs_task_execution_role.arn
task_role_arn = aws_iam_role.airflow_task.arn
network_mode = "awsvpc"
runtime_platform {
operating_system_family = "LINUX"
cpu_architecture = var.task_cpu_architecture
}
requires_compatibilities = ["FARGATE"]

container_definitions = jsonencode([
{
name = "triggerer"
image = join(":", [aws_ecr_repository.airflow.repository_url, "latest"])
cpu = var.triggerer_cpu
memory = var.triggerer_memory
healthcheck = {
command = [
"CMD-SHELL",
"airflow jobs check --job-type TriggererJob --hostname \"$${HOSTNAME}\""
]
interval = 35
timeout = 30
retries = 5
}
essential = true
command = ["triggerer"]
linuxParameters = {
initProcessEnabled = true
}
environment = concat(var.airflow_task_common_environment,
[
{
name = "SERVICES_HASH"
value = join(",", local.services_hashes)
}
]
)
user = "50000:0"
logConfiguration = {
logDriver = "awslogs"
options = {
awslogs-group = aws_cloudwatch_log_group.airflow_triggerer.name
awslogs-region = var.aws_region
awslogs-stream-prefix = "airflow-triggerer"
}
}
}
])
}

resource "aws_security_group" "airflow_triggerer_service" {
name = "${var.prefix}-triggerer"
description = "Deny all incoming traffic"
vpc_id = var.vpc_id
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}

resource "aws_ecs_service" "airflow_triggerer" {
name = "${var.prefix}-triggerer"
depends_on = [null_resource.build_ecr_image, aws_ecr_repository.airflow]
task_definition = aws_ecs_task_definition.airflow_triggerer.family
cluster = aws_ecs_cluster.airflow.arn
deployment_controller {
type = "ECS"
}
deployment_maximum_percent = 200
deployment_minimum_healthy_percent = 100
desired_count = 1
enable_execute_command = true
launch_type = "FARGATE"
network_configuration {
subnets = var.private_subnet_ids
assign_public_ip = false
security_groups = [aws_security_group.airflow_triggerer_service.id]
}
platform_version = "1.4.0"
scheduling_strategy = "REPLICA"
force_new_deployment = var.force_new_ecs_service_deployment
}
4 changes: 4 additions & 0 deletions infrastructure/ecs_services/airflow_worker.tf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ resource "aws_ecs_task_definition" "airflow_worker" {
name = "DUMB_INIT_SETSID"
value = "0"
},
{
name = "AIRFLOW__CORE__TASK_EXECUTION_TOKEN_URL"
value = "https://${lower(local.subdomain)}.${var.domain_name}/api/v2/execution/"
},
{
name = "WORKER_HASHES"
value = join(",", local.workers_hashes)
Expand Down
2 changes: 1 addition & 1 deletion infrastructure/ecs_services/alb.tf
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ resource "aws_alb_target_group" "ecs-app-target-group" {
target_type = "ip"
health_check {
enabled = true
path = "/health"
path = "/api/v2/monitor/health"
# Note: 'interval' must be greater than 'timeout'
interval = 30
timeout = 10
Expand Down
1 change: 1 addition & 0 deletions infrastructure/ecs_services/iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ resource "aws_iam_policy" "secret_manager_read_secret" {
Effect = "Allow"
Resource = [
var.fernet_key_ssm_arn,
var.jwt_secret_ssm_arn,
var.sql_alchemy_conn_ssm_arn,
var.celery_result_backend_ssm_arn
]
Expand Down
8 changes: 5 additions & 3 deletions infrastructure/ecs_services/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ output "airflow_url" {
}

output "allowed_security_groups_id" {
value = tolist([aws_security_group.airflow_webserver_service.id,
value = tolist([
aws_security_group.airflow_webserver_service.id,
aws_security_group.airflow_metrics_service.id,
aws_security_group.airflow_standalone_task.id,
aws_security_group.airflow_scheduler_service.id,
aws_security_group.airflow_worker_service.id

aws_security_group.airflow_worker_service.id,
aws_security_group.airflow_dag_processor_service.id,
aws_security_group.airflow_triggerer_service.id,
])
}
output "worker_security_group_id" {
Expand Down
11 changes: 11 additions & 0 deletions infrastructure/ecs_services/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ variable "airflow_task_common_environment" {
# Allow ECS services to read secrets from AWS Secret Manager.
variable "fernet_key_ssm_arn" {

}

variable "jwt_secret_ssm_arn" {
}
variable "sql_alchemy_conn_ssm_arn" {
}
Expand Down Expand Up @@ -121,4 +124,12 @@ variable "alb_access_logs_prefix" {
description = "S3 key prefix for ALB access logs."
type = string
default = null
}

variable "triggerer_cpu" {
type = number
}

variable "triggerer_memory" {
type = number
}
6 changes: 5 additions & 1 deletion infrastructure/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ locals {

airflow_task_common_environment = concat(var.extra_airflow_task_common_environment, [
{
name = "AIRFLOW__WEBSERVER__INSTANCE_NAME"
name = "AIRFLOW__API_SERVER__INSTANCE_NAME"
value = "${var.prefix}-${var.project}"
},
{
Expand Down Expand Up @@ -42,6 +42,10 @@ locals {
name = "AIRFLOW__CORE__FERNET_KEY_SECRET"
value = substr(module.secrets.fernet_key_name, length(var.prefix) + 16, -1)
},
{
name = "AIRFLOW__API_AUTH__JWT_SECRET_SECRET"
value = substr(module.secrets.jwt_secret_name, length(var.prefix) + 16, -1)
},
{
name = "AIRFLOW__CELERY__RESULT_BACKEND_SECRET"
value = substr(module.secrets.celery_result_backend_name, length(var.prefix) + 16, -1)
Expand Down
14 changes: 14 additions & 0 deletions infrastructure/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ module "sqs_queue" {
prefix = var.prefix
}

resource "random_password" "jwt_secret_generated" {
count = var.jwt_secret == null ? 1 : 0
length = 64
special = false
}

locals {
effective_jwt_secret = var.jwt_secret != null ? var.jwt_secret : random_password.jwt_secret_generated[0].result
}



module "database" {
Expand Down Expand Up @@ -39,6 +49,7 @@ module "secrets" {
db_port = var.airflow_db.port
db_username = var.airflow_db.username
fernet_key = var.fernet_key
jwt_secret = local.effective_jwt_secret
prefix = var.prefix
airflow_admin_username = var.airflow_admin_username
airflow_admin_password = var.airflow_admin_password
Expand Down Expand Up @@ -79,6 +90,7 @@ module "ecs_services" {
airflow_bucket_arn = data.aws_s3_bucket.airflow_bucket.arn
celery_result_backend_ssm_arn = module.secrets.celery_result_backend_arn
fernet_key_ssm_arn = module.secrets.fernet_key_arn
jwt_secret_ssm_arn = module.secrets.jwt_secret_arn
permission_boundaries_arn = var.permission_boundaries_arn
sql_alchemy_conn_ssm_arn = module.secrets.sql_alchemy_conn_arn
sqs_arns_list = concat(var.sqs_arns_list, [module.sqs_queue.celery_broker_arn])
Expand All @@ -101,6 +113,8 @@ module "ecs_services" {
task_cpu_architecture = var.task_cpu_architecture
alb_access_logs_bucket = var.alb_access_logs_bucket
alb_access_logs_prefix = var.alb_access_logs_prefix
triggerer_cpu = var.triggerer_cpu
triggerer_memory = var.triggerer_memory
}

resource "null_resource" "airflow_create_airflow_user" {
Expand Down
9 changes: 9 additions & 0 deletions infrastructure/secrets/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ resource "aws_secretsmanager_secret_version" "fernet_key" {
secret_string = var.fernet_key
}

resource "aws_secretsmanager_secret" "jwt_secret" {
name_prefix = "${var.prefix}/airflow/config/jwt_secret/"
}

resource "aws_secretsmanager_secret_version" "jwt_secret" {
secret_id = aws_secretsmanager_secret.jwt_secret.id
secret_string = var.jwt_secret
}

# Store core.sql_alchemy_conn setting for consumption by airflow SecretsManagerBackend.
# The config options must follow the config prefix naming convention defined within the secrets backend.
# This means that sql_alchemy_conn is not defined with a connection prefix, but with "config" prefix.
Expand Down
Loading
Loading