Dotflow is a lightweight Python library for execution pipelines. Define tasks with decorators, chain them together, and deploy to any cloud β with built-in retry, parallel execution, storage, observability, and cloud deployment.
- Simple β
@actiondecorator +workflow.start(). That's it. - Resilient β Retry, backoff, timeout, checkpoints, and error handling out of the box.
- Observable β OpenTelemetry traces, metrics, and logs. Sentry error tracking.
- Deployable β
dotflow deploy --platform lambdaships your pipeline to AWS in one command. - Portable β Same code runs on Lambda, ECS, Cloud Run, Alibaba FC, Kubernetes, Docker, or GitHub Actions.
pip install dotflowfrom dotflow import DotFlow, action
@action
def extract():
return {"users": 150}
@action
def transform(previous_context):
total = previous_context.storage["users"]
return {"users": total, "active": int(total * 0.8)}
@action
def load(previous_context):
print(f"Loaded {previous_context.storage['active']} active users")
workflow = DotFlow()
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()Write your pipeline once. Deploy to any cloud with a single command.
dotflow init
dotflow deploy --platform lambda --project my_pipelinepip install dotflow[aws] # S3 storage
pip install dotflow[gcp] # Google Cloud Storage
pip install dotflow[scheduler] # Cron scheduler
pip install dotflow[otel] # OpenTelemetry
pip install dotflow[sentry] # Sentry error tracking
pip install dotflow[deploy-aws] # AWS deploy (Lambda, ECS)
pip install dotflow[deploy-gcp] # GCP deploy (Cloud Run)
pip install dotflow[deploy-alibaba] # Alibaba Cloud deploy (FC)
pip install dotflow[deploy-github] # GitHub Actions deploy| Section | Description |
|---|---|
| Concepts | Workflows, tasks, context, providers, process modes |
| How-to Guides | Step-by-step tutorials for workflows, tasks, and CLI |
| Cloud Deployment | Deploy to AWS, GCP, Alibaba, Kubernetes, Docker, GitHub Actions |
| Integrations | OpenTelemetry, Sentry, Telegram, Discord, S3, GCS, Server |
| Examples | Real-world pipelines: ETL, health checks, async, scheduler |
| Reference | API reference for all classes and providers |
| Custom Providers | Build your own storage, notify, log, tracer, or metrics provider |
Observability
OpenTelemetry docs | Sentry docs | Tracer docs | Metrics docs
Built-in support for OpenTelemetry and Sentry:
from dotflow import Config
from dotflow.providers import LogOpenTelemetry, TracerOpenTelemetry, MetricsOpenTelemetry
# OpenTelemetry: traces, metrics, and structured logs
config = Config(
log=LogOpenTelemetry(service_name="my-pipeline"),
tracer=TracerOpenTelemetry(service_name="my-pipeline"),
metrics=MetricsOpenTelemetry(service_name="my-pipeline"),
)from dotflow.providers import LogSentry, TracerSentry
# Sentry: error tracking + performance monitoring
config = Config(
log=LogSentry(dsn="https://xxx@sentry.io/123"),
tracer=TracerSentry(),
)Execution Modes
Dotflow supports 4 execution strategies out of the box:
Tasks run one after another. The context from each task flows to the next.
workflow.task.add(step=task_a)
workflow.task.add(step=task_b)
workflow.start() # or mode="sequential"flowchart LR
A[task_a] --> B[task_b] --> C[Finish]
Same as sequential, but runs in a background thread β non-blocking.
workflow.start(mode="background")Every task runs simultaneously in its own process.
workflow.task.add(step=task_a)
workflow.task.add(step=task_b)
workflow.task.add(step=task_c)
workflow.start(mode="parallel")flowchart TD
S[Start] --> A[task_a] & B[task_b] & C[task_c]
A & B & C --> F[Finish]
Assign tasks to named groups. Groups run in parallel, but tasks within each group run sequentially.
workflow.task.add(step=fetch_users, group_name="users")
workflow.task.add(step=save_users, group_name="users")
workflow.task.add(step=fetch_orders, group_name="orders")
workflow.task.add(step=save_orders, group_name="orders")
workflow.start()flowchart TD
S[Start] --> G1[Group: users] & G2[Group: orders]
G1 --> A[fetch_users] --> B[save_users]
G2 --> C[fetch_orders] --> D[save_orders]
B & D --> F[Finish]
Retry, Timeout & Backoff
The @action decorator supports built-in resilience options:
@action(retry=3, timeout=10, retry_delay=2, backoff=True)
def unreliable_api_call():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
return response.json()| Parameter | Type | Default | Description |
|---|---|---|---|
retry |
int |
1 |
Number of attempts before failing |
timeout |
int |
0 |
Max seconds per attempt (0 = no limit) |
retry_delay |
int |
1 |
Seconds to wait between retries |
backoff |
bool |
False |
Exponential backoff (delay doubles each retry) |
Context System
Tasks communicate through a context chain. Each task receives the previous task's output and can access its own initial context.
@action
def step_one():
return "Hello"
@action
def step_two(previous_context, initial_context):
greeting = previous_context.storage # "Hello"
name = initial_context.storage # "World"
return f"{greeting}, {name}!"
workflow = DotFlow()
workflow.task.add(step=step_one)
workflow.task.add(step=step_two, initial_context="World")
workflow.start()Each Context object contains:
storageβ the return value from the tasktask_idβ the task identifierworkflow_idβ the workflow identifiertimeβ timestamp of execution
Checkpoint & Resume
Resume a workflow from where it left off. Requires a persistent storage provider and a fixed workflow_id.
from dotflow import DotFlow, Config, action
from dotflow.providers import StorageFile
config = Config(storage=StorageFile())
workflow = DotFlow(config=config, workflow_id="my-pipeline-v1")
workflow.task.add(step=step_a)
workflow.task.add(step=step_b)
workflow.task.add(step=step_c)
# First run β executes all tasks and saves checkpoints
workflow.start()
# If step_c failed, fix and re-run β skips step_a and step_b
workflow.start(resume=True)Storage Providers
Choose where task results are persisted:
from dotflow import DotFlow
workflow = DotFlow() # uses StorageDefault (in-memory)from dotflow import DotFlow, Config
from dotflow.providers import StorageFile
config = Config(storage=StorageFile(path=".output"))
workflow = DotFlow(config=config)pip install dotflow[aws]from dotflow import DotFlow, Config
from dotflow.providers import StorageS3
config = Config(storage=StorageS3(bucket="my-bucket", prefix="pipelines/", region="us-east-1"))
workflow = DotFlow(config=config)pip install dotflow[gcp]from dotflow import DotFlow, Config
from dotflow.providers import StorageGCS
config = Config(storage=StorageGCS(bucket="my-bucket", prefix="pipelines/", project="my-project"))
workflow = DotFlow(config=config)Notifications
Get notified about task status changes via Telegram or Discord.
from dotflow import Config
from dotflow.providers import NotifyTelegram
config = Config(notify=NotifyTelegram(
token="YOUR_BOT_TOKEN",
chat_id=123456789,
))from dotflow.providers import NotifyDiscord
config = Config(notify=NotifyDiscord(
webhook_url="https://discord.com/api/webhooks/...",
))Class-Based Steps
Return a class instance from a task, and Dotflow will automatically discover and execute all @action-decorated methods in source order.
from dotflow import action
class ETLPipeline:
@action
def extract(self):
return {"raw": [1, 2, 3]}
@action
def transform(self, previous_context):
data = previous_context.storage["raw"]
return {"processed": [x * 2 for x in data]}
@action
def load(self, previous_context):
print(f"Loaded: {previous_context.storage['processed']}")
@action
def run_pipeline():
return ETLPipeline()
workflow = DotFlow()
workflow.task.add(step=run_pipeline)
workflow.start()Task Groups
Organize tasks into named groups for parallel group execution.
workflow.task.add(step=scrape_site_a, group_name="scraping")
workflow.task.add(step=scrape_site_b, group_name="scraping")
workflow.task.add(step=process_data, group_name="processing")
workflow.task.add(step=save_results, group_name="processing")
workflow.start() # groups run in parallel, tasks within each group run sequentiallyCallbacks
Execute a function after each task completes β useful for logging, alerting, or side effects.
def on_task_done(task):
print(f"Task {task.task_id} finished with status: {task.status}")
workflow.task.add(step=my_step, callback=on_task_done)Workflow-level callbacks for success and failure:
def on_success(*args, **kwargs):
print("All tasks completed!")
def on_failure(*args, **kwargs):
print("Something went wrong.")
workflow.start(on_success=on_success, on_failure=on_failure)Error Handling
Control whether the workflow stops or continues when a task fails:
# Stop on first failure (default)
workflow.start(keep_going=False)
# Continue executing remaining tasks even if one fails
workflow.start(keep_going=True)Each task tracks its errors with full detail:
- Attempt number
- Exception type and message
- Traceback
Access results after execution:
for task in workflow.result_task():
print(f"Task {task.task_id}: {task.status}")
if task.errors:
print(f" Errors: {task.errors}")Async Support
@action automatically detects and handles async functions:
import httpx
from dotflow import DotFlow, action
@action(timeout=30)
async def fetch_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
workflow = DotFlow()
workflow.task.add(step=fetch_data)
workflow.start()Scheduler / Cron
Cron scheduler docs | Default scheduler | Cron overlap (concepts)
Schedule workflows to run automatically using cron expressions.
pip install dotflow[scheduler]from dotflow import DotFlow, Config, action
from dotflow.providers import SchedulerCron
@action
def sync_data():
return {"synced": True}
config = Config(scheduler=SchedulerCron(cron="*/5 * * * *"))
workflow = DotFlow(config=config)
workflow.task.add(step=sync_data)
workflow.schedule()Control what happens when a new execution triggers while the previous one is still running:
| Strategy | Description |
|---|---|
skip |
Drops the new run if the previous is still active (default) |
queue |
Buffers one pending run, executes when the current finishes |
parallel |
Runs up to 10 concurrent executions via semaphore |
from dotflow.providers import SchedulerCron
# Queue overlapping executions
scheduler = SchedulerCron(cron="*/5 * * * *", overlap="queue")
# Allow parallel executions
scheduler = SchedulerCron(cron="*/5 * * * *", overlap="parallel")The scheduler handles graceful shutdown via SIGINT/SIGTERM signals automatically.
CLI
Run workflows directly from the command line:
# Simple execution
dotflow start --step my_module.my_task
# With initial context
dotflow start --step my_module.my_task --initial-context '{"key": "value"}'
# With callback
dotflow start --step my_module.my_task --callback my_module.on_done
# With execution mode
dotflow start --step my_module.my_task --mode parallel
# With file storage
dotflow start --step my_module.my_task --storage file --path .output
# With S3 storage
dotflow start --step my_module.my_task --storage s3
# With GCS storage
dotflow start --step my_module.my_task --storage gcs
# Schedule with cron
dotflow schedule --step my_module.my_task --cron "*/5 * * * *"
# Schedule with overlap strategy
dotflow schedule --step my_module.my_task --cron "0 * * * *" --overlap queue
# Schedule with resume
dotflow schedule --step my_module.my_task --cron "0 */6 * * *" --storage file --resumeAvailable CLI commands:
| Command | Description |
|---|---|
dotflow init |
Scaffold a new project with cloud support |
dotflow start |
Run a workflow |
dotflow schedule |
Run a workflow on a cron schedule |
dotflow logs |
View execution logs |
dotflow cloud list |
Show available cloud platforms |
dotflow cloud generate --platform <name> |
Generate deployment files |
dotflow deploy --platform <name> --project <name> |
Deploy to cloud |
Server Provider
Send workflow and task execution data to a remote API (e.g. dotflow-api) in real time.
from dotflow import DotFlow, Config, action
from dotflow.providers import ServerDefault
@action
def my_task():
return {"result": "ok"}
config = Config(
server=ServerDefault(
base_url="http://localhost:8000/api/v1",
user_token="your-api-token",
)
)
workflow = DotFlow(config=config)
workflow.task.add(step=my_task)
workflow.start()| Parameter | Type | Default | Description |
|---|---|---|---|
base_url |
str |
"" |
API base URL |
user_token |
str |
"" |
API token (X-User-Token header) |
timeout |
float |
5.0 |
HTTP request timeout in seconds |
The server provider automatically:
- Creates the workflow on
DotFlow()init - Creates each task on
task.add() - Updates task status on each transition (In progress, Completed, Failed, Retry)
- Updates workflow status on completion (In progress β Completed)
Dependency Injection via Config
The Config class lets you swap providers for storage, notifications, logging, scheduling, and server:
from dotflow import DotFlow, Config
from dotflow.providers import StorageFile, NotifyTelegram, LogDefault, SchedulerCron, ServerDefault
config = Config(
storage=StorageFile(path=".output"),
notify=NotifyTelegram(token="...", chat_id=123),
log=LogDefault(),
scheduler=SchedulerCron(cron="0 * * * *"),
server=ServerDefault(base_url="...", user_token="..."),
)
workflow = DotFlow(config=config)Extend Dotflow by implementing the abstract base classes:
| ABC | Methods | Purpose |
|---|---|---|
Storage |
post, get, key |
Custom storage backends |
Notify |
hook_status_task |
Custom notification channels |
Log |
info, error, warning, debug |
Custom logging |
Scheduler |
start, stop |
Custom scheduling strategies |
Tracer |
start_workflow, end_workflow, start_task, end_task |
Distributed tracing |
Metrics |
workflow_started, workflow_completed, workflow_failed, task_completed, task_failed, task_retried |
Counters and histograms |
Server |
create_workflow, update_workflow, create_task, update_task |
Remote API communication |
Results & Inspection
After execution, inspect results directly from the workflow object:
workflow.start()
# List of Task objects
tasks = workflow.result_task()
# List of Context objects (one per task)
contexts = workflow.result_context()
# List of storage values (raw return values)
storages = workflow.result_storage()
# Serialized result (Pydantic model)
result = workflow.result()Task builder utilities:
workflow.task.count() # Number of tasks
workflow.task.clear() # Remove all tasks
workflow.task.reverse() # Reverse execution order
workflow.task.schema() # Pydantic schema of the workflowDynamic Module Import
Reference tasks and callbacks by their module path string instead of importing them directly:
workflow.task.add(step="my_package.tasks.process_data")
workflow.task.add(step="my_package.tasks.save_results", callback="my_package.callbacks.notify")All examples are available in the docs_src/ directory.
| Icon | Type | Description |
|---|---|---|
| βοΈ | FEATURE | New feature |
| π | PEP8 | Formatting fixes following PEP8 |
| π | ISSUE | Reference to issue |
| πͺ² | BUG | Bug fix |
| π | DOCS | Documentation changes |
| π¦ | PyPI | PyPI releases |
| β€οΈοΈ | TEST | Automated tests |
| β¬οΈ | CI/CD | Changes in continuous integration/delivery |
| SECURITY | Security improvements |
This project is licensed under the terms of the MIT License.
