Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,16 @@ Creates a Cloud Scheduler job on GCP that triggers the specified pipeline steps
aigear-scheduler --create --version v1 --step_names fetch_data,preprocessing,training
```

> **Tip:** Once created, you can go to [Cloud Scheduler](https://console.cloud.google.com/cloudscheduler) in the GCP Console to manually trigger an immediate run. A `--run` flag for triggering directly from the CLI is planned but not yet available (`aigear-scheduler --version v1 --run`).
To trigger a run immediately, pause, or manage the job lifecycle:

```bash
aigear-scheduler --run --version v1
aigear-scheduler --pause --version v1
aigear-scheduler --resume --version v1
aigear-scheduler --status --version v1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the status returns ENABLED, PAUSED, missing, etc?

```

> See `aigear-scheduler --help` or the [CLI Reference](docs/cli-reference.md#aigear-scheduler) for all available commands.

---

Expand Down Expand Up @@ -179,7 +188,7 @@ See the full [CLI Reference](docs/cli-reference.md) for all commands and argumen
| `aigear-init` | Initialize a new project scaffold |
| `aigear-gcp-infra` | Create GCP infrastructure (buckets, IAM, Pub/Sub, schedulers) |
| `aigear-task` | Run a pipeline step (`workflow`) or start a gRPC model server (`grpc`) |
| `aigear-scheduler` | Create a Cloud Scheduler job for pipeline steps |
| `aigear-scheduler` | Manage Cloud Scheduler jobs (create / update / delete / run / pause / resume) |
| `aigear-image` | Build and/or push Docker images to Artifact Registry |
| `aigear-model-yaml` | Generate Kubernetes deployment YAML files for model services |
| `aigear-deploy-model` | Deploy or delete a gRPC model service (local Kubernetes or GCP) |
Expand Down
42 changes: 34 additions & 8 deletions docs/cli-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,47 @@ aigear-task grpc --version VERSION

### `aigear-scheduler`

Create a Cloud Scheduler job that triggers the given pipeline steps.
Manage Cloud Scheduler jobs that trigger pipeline steps via Pub/Sub.

```
aigear-scheduler --version VERSION --step_names STEPS [--create]
aigear-scheduler <command> --version VERSION [--step_names STEPS] [--env ENV]
```

| Command | `--step_names` required | Description |
|---|---|---|
| `--create` | yes | Create a new scheduler job (skips if already exists) |
| `--update` | yes | Update schedule and message body of an existing job |
| `--delete` | — | Delete the scheduler job |
| `--status` | — | Print the current status of the scheduler job |
| `--list` | — | List scheduler jobs filtered by name |
| `--run` | — | Manually trigger the scheduler job immediately |
| `--pause` | — | Pause the scheduler job (stops automatic execution) |
| `--resume` | — | Resume a paused scheduler job |

| Argument | Default | Description |
|---|---|---|
| `--create` | — | Create GCP scheduler job. Runs by default if omitted. |
| `--version` | — | Pipeline version (required) |
| `--step_names` | — | Comma-separated step names, e.g. `fetch_data,training` (required) |
| `--version` | — | Pipeline version (required for all commands) |
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argument Required Description
--version Yes Pipeline version.
--step_names Only for --create / --update Comma-separated step names, e.g. fetch_data,training.
--env No Deployment environment: staging or production. Default: staging.

| `--step_names` | — | Comma-separated step names, e.g. `fetch_data,training` (required for `--create` / `--update`) |
| `--env` | `staging` | Deployment environment for model service: `staging` or `production` |

**Examples**

```bash
# Create a scheduler job for two pipeline steps
aigear-scheduler --create --version logistic_regression --step_names fetch_data,training

# Update the schedule or message body
aigear-scheduler --update --version logistic_regression --step_names fetch_data,training --env production

# Trigger an immediate run without waiting for the cron schedule
aigear-scheduler --run --version logistic_regression

# Pause / resume
aigear-scheduler --pause --version logistic_regression
aigear-scheduler --resume --version logistic_regression
```

> `--version` and `--step_names` are required; the command will print a reminder and exit if either is missing.
>
> Future commands: `--delete`, `--update`, `--run`...
> The scheduler job name, cron schedule, and Pub/Sub topic are read from the `scheduler` block in `env.json` for the given `--version`.

---

Expand Down
3 changes: 2 additions & 1 deletion src/aigear/cli/artifacts_image.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse

from aigear.common.constant import DOCKERFILE_PIPELINE, DOCKERFILE_SERVICE
from aigear.deploy.gcp.artifacts_image import create_artifacts_image


Expand Down Expand Up @@ -36,7 +37,7 @@ def get_argument() -> argparse.Namespace:
def _run_images(args: argparse.Namespace) -> None:
if args.dockerfile_path is None:
print("No '--dockerfile_path' provided, operating on all default images.")
for dockerfile, is_service in [("Dockerfile.pl", False), ("Dockerfile.ms", True)]:
for dockerfile, is_service in [(DOCKERFILE_PIPELINE, False), (DOCKERFILE_SERVICE, True)]:
print(f"Processing image: '{dockerfile}'...")
success = create_artifacts_image(
dockerfile_path=dockerfile,
Expand Down
65 changes: 42 additions & 23 deletions src/aigear/cli/gcp_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,67 @@
import argparse

from aigear.deploy.gcp.scheduler import create_scheduler
from aigear.deploy.gcp.scheduler import (
create_scheduler,
update_scheduler,
delete_scheduler,
status_scheduler,
list_scheduler,
run_scheduler,
pause_scheduler,
resume_scheduler,
)


def get_argument() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--create",
action="store_true",
help="Create GCP scheduler job."
)
# Future commands:
# group.add_argument("--delete", action="store_true", help="...")
# group.add_argument("--update", action="store_true", help="...")
group.add_argument("--create", action="store_true", help="Create GCP scheduler job.")
group.add_argument("--update", action="store_true", help="Update an existing GCP scheduler job.")
group.add_argument("--delete", action="store_true", help="Delete a GCP scheduler job.")
group.add_argument("--status", action="store_true", help="Show status of a GCP scheduler job.")
group.add_argument("--list", action="store_true", help="List GCP scheduler jobs filtered by name.")
group.add_argument("--run", action="store_true", help="Manually trigger a GCP scheduler job.")
group.add_argument("--pause", action="store_true", help="Pause a GCP scheduler job.")
group.add_argument("--resume", action="store_true", help="Resume a paused GCP scheduler job.")
parser.add_argument("--version", default="",
help="Version of the pipeline.")
parser.add_argument("--step_names", default="",
help="Comma-separated names of the pipeline steps.")
help="Comma-separated names of the pipeline steps (required for --create / --update).")
parser.add_argument("--env", default="staging",
choices=["staging", "production"],
help="Deployment environment for model service yaml (default: from env.json).")
help="Deployment environment for model service yaml (default: staging).")
return parser.parse_args()


def gcp_scheduler() -> None:
args = get_argument()

missing = []
if not args.version:
missing.append("--version")
if not args.step_names:
missing.append("--step_names")
if missing:
print(f"Missing required argument(s): {', '.join(missing)}")
print("Missing required argument: --version")
return

step_names = args.step_names.split(",")
needs_step_names = args.create or args.update
if needs_step_names and not args.step_names:
print("Missing required argument: --step_names (required for --create / --update)")
return

step_names = args.step_names.split(",") if args.step_names else []

if args.create:
create_scheduler(args.version, step_names, args.env)
# Future commands:
# elif args.delete:
# delete_scheduler(args.version, step_names)
# elif args.update:
# update_scheduler(args.version, step_names)
elif args.update:
update_scheduler(args.version, step_names, args.env)
elif args.delete:
delete_scheduler(args.version)
elif args.status:
status_scheduler(args.version)
elif args.list:
list_scheduler(args.version)
elif args.run:
run_scheduler(args.version)
elif args.pause:
pause_scheduler(args.version)
elif args.resume:
resume_scheduler(args.version)
5 changes: 3 additions & 2 deletions src/aigear/cli/model_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse

from aigear.common.constant import ENV_PRODUCTION, ENV_STAGING
from aigear.service.grpc.constant import DEFAULT_GRPC_PORT
from aigear.deploy.gcp.grpc_gcp_deploy import delete_gcp_grpc, deploy_gcp_grpc
from aigear.deploy.local.grpc_local_deploy import delete_local_grpc, deploy_local_grpc

Expand All @@ -11,11 +12,11 @@ def get_argument() -> argparse.Namespace:
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--version',
help='Version of the pipeline')
parser.add_argument('--service_ports', default="50051",
parser.add_argument('--service_ports', default=DEFAULT_GRPC_PORT,
help='Internal interface of service')
parser.add_argument('--replicas', default=1,
help='Number of copies')
parser.add_argument('--port', default="50051",
parser.add_argument('--port', default=DEFAULT_GRPC_PORT,
help='External interface of service')

env_group = parser.add_mutually_exclusive_group(required=True)
Expand Down
4 changes: 4 additions & 0 deletions src/aigear/common/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@
ENV_LOCAL = "local"
ENV_STAGING = "staging"
ENV_PRODUCTION = "production"

# Standard Dockerfile names — used by artifacts_image CLI and project initialisation.
DOCKERFILE_PIPELINE = "Dockerfile.pl"
DOCKERFILE_SERVICE = "Dockerfile.ms"
9 changes: 5 additions & 4 deletions src/aigear/deploy/common/helm_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from aigear.common.config import get_project_name
from aigear.common.constant import ENV_LOCAL, VENV_BASE_DIR
from aigear.service.grpc.constant import DEFAULT_GRPC_PORT
from aigear.common.image import get_image_path
from aigear.common.logger import Logging

Expand Down Expand Up @@ -49,9 +50,9 @@ def _create_helm_chart(
helm_path,
service_name,
service_image,
service_ports: str = "50051",
service_ports: str = DEFAULT_GRPC_PORT,
replicas: int = 1,
port: str = "50051",
port: str = DEFAULT_GRPC_PORT,
pipeline_version=None,
model_class_path=None,
env: str = ENV_LOCAL,
Expand Down Expand Up @@ -106,9 +107,9 @@ def _create_helm_chart(
def create_helm_file(
pipeline_version: str = None,
model_class_path: str = None,
service_ports: str = "50051",
service_ports: str = DEFAULT_GRPC_PORT,
replicas: int = 1,
port: str = "50051",
port: str = DEFAULT_GRPC_PORT,
env: str = ENV_LOCAL,
venv: str = None,
force: bool = False,
Expand Down
5 changes: 3 additions & 2 deletions src/aigear/deploy/gcp/grpc_gcp_deploy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from aigear.common import run_sh
from aigear.common.config import AigearConfig, PipelinesConfig
from aigear.common.constant import ENV_STAGING
from aigear.service.grpc.constant import DEFAULT_GRPC_PORT
from aigear.common.logger import Logging
from aigear.deploy.common.helm_chart import create_helm_file, get_helm_path
from aigear.deploy.common.kubectl_command import helm_deploy, helm_deployment_delete
Expand All @@ -21,9 +22,9 @@ def switch_gcp_context(cluster_name: str, project_id: str, region: str) -> None:

def deploy_gcp_grpc(
pipeline_version: str | None = None,
service_ports: str = "50051",
service_ports: str = DEFAULT_GRPC_PORT,
replicas: int = 1,
port: str = "50051",
port: str = DEFAULT_GRPC_PORT,
env: str = ENV_STAGING,
):
pipe_config = PipelinesConfig.get_version_config(pipeline_version)
Expand Down
Loading