- Overview
- Task Hierarchy
- HTTP Tasks
- File Tasks
- Database Tasks
- Processing Tasks
- Timing Tasks
- Logging Tasks
- Utility Tasks
- Creating Custom Tasks
Tasks are the atomic units of work in the Workflow Engine. Each task implements a single responsibility and can be composed into complex workflows. Tasks read inputs from and write outputs to the shared WorkflowContext.
All tasks implement the Task interface:
public interface Task {
void execute(WorkflowContext context) throws TaskExecutionException;
String getName();
}AbstractTask: Base class providing common task infrastructureAbstractHttpTask: Base class for HTTP tasks with request/response handling
Task (interface)
│
├── AbstractTask (abstract)
│ ├── DelayTask
│ ├── NoOpTask
│ ├── LogTask
│ ├── FileReadTask
│ ├── FileWriteTask
│ ├── ShellCommandTask
│ ├── JdbcQueryTask
│ ├── JdbcTypedQueryTask
│ ├── JdbcStreamingQueryTask
│ ├── JdbcUpdateTask
│ ├── JdbcBatchUpdateTask
│ ├── JdbcCallableTask
│ ├── JdbcTransactionTask
│ └── AbstractHttpTask (abstract)
│ ├── GetHttpTask
│ ├── PostHttpTask
│ ├── PutHttpTask
│ └── DeleteHttpTask
│
└── Custom implementations
Performs HTTP GET requests with automatic JSON handling.
Purpose: Retrieve data from RESTful APIs
Features:
- Automatic
Accept: application/jsonheader - Type-safe response deserialization
- Query parameter support
- Custom headers
Builder Configuration:
GetHttpTask.Builder<T>
.url(String) // Target URL
.urlFromContext(String) // URL from context key
.header(String, String) // Add custom header
.headers(Map<String, String>) // Add multiple headers
.responseType(Class<T>) // Response type for deserialization
.responseContextKey(String) // Context key for response (default: "httpResponse")
.build()
Examples:
public void example() {
HttpClient client = HttpClient.newHttpClient();
GetHttpTask<String> task = new GetHttpTask.Builder<String>(client)
.url("https://api.example.com/users")
.build();
task.execute(new WorkflowContext());
String response = context.getTyped("httpResponse", String.class);
// Typed response with auth
GetHttpTask<User> task = new GetHttpTask.Builder<User>(client)
.url("https://api.example.com/users/123")
.responseType(User.class)
.header("Authorization", "Bearer " + token)
.responseContextKey("userData")
.build();
User user = context.getTyped("userData", User.class);
// Dynamic URL from context
context.put("apiUrl", "https://api.example.com/search");
context.put("queryParams", Map.of("q", "java", "limit", "10"));
GetHttpTask<String> task = new GetHttpTask.Builder<String>(client)
.urlFromContext("apiUrl")
.build();
}Context Usage:
- Inputs:
- Optional
queryParams(Map<String, String>) for dynamic query parameters - Optional URL from context if using
urlFromContext()
- Optional
- Outputs:
- Response stored at configured key (default: "httpResponse")
Performs HTTP POST requests with JSON or form data.
Purpose: Create resources or submit data to APIs
Features:
- JSON body support
- Form data URL encoding
- Automatic Content-Type header
- Response deserialization
Builder Configuration:
PostHttpTask.Builder<T>
.url(String)
.urlFromContext(String)
.body(String) // JSON body (highest precedence)
.form(Map<String, String>) // Form data (URL-encoded)
.header(String, String)
.responseType(Class<T>)
.responseContextKey(String)
.build()
Body Priority:
- Explicit
body()parameter - Context key "REQUEST_BODY"
- Form data via
form() - Empty body
Examples:
public void example() {
PostTask<ApiResponse> task = new PostTask.Builder<ApiResponse>(client)
.url("https://api.example.com/users")
.body("{\"name\":\"John\",\"email\":\"john@example.com\"}")
.responseType(ApiResponse.class)
.build();
// Form submission
PostTask<String> formTask = new PostTask.Builder<String>(client)
.url("https://api.example.com/login")
.form(Map.of(
"username", "john",
"password", "secret123"
))
.build();
// Body from context
context.put("REQUEST_BODY", userJson);
PostTask<User> task = new PostTask.Builder<User>(client)
.url("https://api.example.com/users")
.responseType(User.class)
.build();
}Context Usage:
- Inputs:
- Optional
REQUEST_BODY(String) for body content - Optional URL from context
- Optional
- Outputs:
- Response stored at configured key
Performs HTTP PUT requests for updating resources.
Purpose: Update existing resources
Features: Same as PostHttpTask (JSON body, form data, etc.)
Examples:
// Update user
PutTask<User> task = new PutTask.Builder<User>(client)
.url("https://api.example.com/users/123")
.body("{\"name\":\"John Updated\"}")
.responseType(User.class)
.build();
// Form update
PutTask<String> task = new PutTask.Builder<String>(client)
.url("https://api.example.com/settings")
.form(Map.of("theme", "dark", "language", "en"))
.build();Performs HTTP DELETE requests.
Purpose: Delete resources
Features:
- Simple DELETE operations
- Optional request body
- Response handling
Examples:
// Simple delete
DeleteHttpTask<String> task = new DeleteHttpTask.Builder<String>(client)
.url("https://api.example.com/users/123")
.build();
// Delete with auth
DeleteHttpTask<ApiResponse> task = new DeleteHttpTask.Builder<ApiResponse>(client)
.url("https://api.example.com/sessions/" + sessionId)
.header("Authorization", "Bearer " + token)
.responseType(ApiResponse.class)
.build();Reads file contents into context.
Purpose: Load data from files
Features:
- Path-based file reading
- Encoding support
- UTF-8 default encoding
Constructor:
FileReadTask(Path filePath, String contextKey);Examples:
// Read JSON file
public void example() {
FileReadTask task = new FileReadTask(
Path.of("data/users.json"),
"userData"
);
task.execute(context);
String jsonData = context.getTyped("userData", String.class);
// Read configuration
FileReadTask configTask = new FileReadTask(
Path.of("config/application.properties"),
"config"
);
}Context Usage:
- Inputs: None
- Outputs:
- File contents stored at specified context key (String)
Error Handling:
- Throws
TaskExecutionExceptionon:- File not found
- Permission errors
- I/O errors
Writes context data to files.
Purpose: Persist processed data
Features:
- Path-based file writing
- Creates parent directories if needed
- UTF-8 encoding
- Overwrites existing files
Constructor:
FileWriteTask(Path filePath, String contextKey);Examples:
// Write processed data
public void example() {
context.put("processedData", jsonString);
FileWriteTask task = new FileWriteTask(
Path.of("output/results.json"),
"processedData"
);
task.execute(context);
// Save report
context.put("report", reportContent);
FileWriteTask reportTask = new FileWriteTask(
Path.of("reports/daily-report.txt"),
"report"
);
}Context Usage:
- Inputs:
- Data from specified context key (toString() called)
- Outputs: None
Error Handling:
- Throws
TaskExecutionExceptionon:- Permission errors
- Disk full
- I/O errors
Executes SQL SELECT queries using JDBC and returns results as a list of maps.
Purpose: Query databases and retrieve data
Features:
- Parameterized query support (prevents SQL injection)
- Automatic result set mapping to List<Map<String, Object>>
- Connection pooling support via DataSource
- Handles JDBC-specific types (Clob, Array, etc.)
- Preserves column order using LinkedHashMap
Builder Configuration:
JdbcQueryTask.builder()
.dataSource(DataSource) // JDBC DataSource (required)
.readingSqlFrom(String) // Context key for SQL query (required)
.readingParamsFrom(String) // Context key for parameters (required)
.writingResultsTo(String) // Context key for results (required)
.build()
Constructor:
JdbcQueryTask(
DataSource dataSource,
String sqlKey,
String paramsKey,
String outputKey
);Examples:
// Basic query with parameters
public void basicQuery() {
DataSource dataSource = createDataSource(); // Your connection pool
WorkflowContext context = new WorkflowContext();
context.put("sql", "SELECT id, name, email FROM users WHERE status = ? AND created_at > ?");
context.put("params", Arrays.asList("ACTIVE", LocalDate.of(2024, 1, 1)));
JdbcQueryTask task = JdbcQueryTask.builder()
.dataSource(dataSource)
.readingSqlFrom("sql")
.readingParamsFrom("params")
.writingResultsTo("queryResults")
.build();
task.execute(context);
List<Map<String, Object>> rows = (List<Map<String, Object>>) context.get("queryResults");
for (Map<String, Object> row : rows) {
Integer id = (Integer) row.get("id");
String name = (String) row.get("name");
String email = (String) row.get("email");
// Process row...
}
}
// Query without parameters
public void simpleQuery() {
WorkflowContext context = new WorkflowContext();
context.put("sql", "SELECT * FROM products ORDER BY price DESC LIMIT 10");
context.put("params", Collections.emptyList());
JdbcQueryTask task = new JdbcQueryTask(
dataSource,
"sql",
"params",
"topProducts"
);
task.execute(context);
List<Map<String, Object>> products = context.get("topProducts");
}
// Complex query with joins
public void complexQuery() {
context.put("userQuery",
"SELECT u.id, u.name, o.order_id, o.total " +
"FROM users u " +
"LEFT JOIN orders o ON u.id = o.user_id " +
"WHERE u.country = ? AND o.status = ?");
context.put("userParams", Arrays.asList("USA", "COMPLETED"));
JdbcQueryTask task = JdbcQueryTask.builder()
.dataSource(dataSource)
.readingSqlFrom("userQuery")
.readingParamsFrom("userParams")
.writingResultsTo("userOrders")
.build();
}
// In a workflow
public Workflow buildUserReportWorkflow(DataSource dataSource) {
JdbcQueryTask queryTask = JdbcQueryTask.builder()
.dataSource(dataSource)
.readingSqlFrom("reportSql")
.readingParamsFrom("reportParams")
.writingResultsTo("reportData")
.build();
return SequentialWorkflow.builder()
.name("UserReport")
.task(context -> {
// Prepare query
context.put("reportSql", "SELECT * FROM user_stats WHERE date >= ?");
context.put("reportParams", List.of(LocalDate.now().minusDays(30)));
})
.task(queryTask)
.task(context -> {
// Process results
List<Map<String, Object>> data = context.get("reportData");
generateReport(data);
})
.build();
}Context Usage:
- Inputs:
- SQL query string (from configured sqlKey)
- Optional List parameters (from configured paramsKey, empty list if not present)
- Outputs:
- List<Map<String, Object>> results (at configured outputKey)
- Each Map represents one row with column names as keys
- Throws
TaskExecutionExceptionon:- Database connection failures
- SQL syntax errors
- Parameter binding errors
- Type conversion errors
- All JDBC resources (Connection, PreparedStatement, ResultSet) are automatically closed
- Always use parameterized queries (?) to prevent SQL injection
- Use connection pooling (HikariCP, Apache DBCP) for DataSource
- Keep queries simple and focused
- Consider indexing columns used in WHERE clauses
- Handle large result sets appropriately (pagination, streaming)
- Parameterized statement support
- Returns affected row count
- Transaction support (via DataSource configuration)
- Connection pooling support
- Inputs:
- SQL statement string (from configured sqlKey)
- Optional List parameters (from configured paramsKey, empty list if not present)
- Outputs:
- Integer rows affected count (at configured outputKey)
- Throws
TaskExecutionExceptionon:- Database connection failures
- SQL syntax errors
- Constraint violations (unique, foreign key, etc.)
- Parameter binding errors
- All JDBC resources are automatically closed
- Use parameterized statements to prevent SQL injection
- Check rowsAffected to verify operation success
- Consider using database transactions for multiple updates
- Handle constraint violations gracefully
- Use appropriate DataSource transaction isolation levels
- Batch execution for better performance
- Multiple parameter sets with single SQL template
- Returns individual row counts for each statement
- Efficient for large data volumes
- Inputs:
- SQL template string (from configured sqlKey)
- List<List> batch parameters (from configured batchParamsKey)
- Each inner List represents parameters for one statement execution
- Outputs:
- int[] array of affected row counts (at configured outputKey)
- Each element corresponds to one batch statement
- Throws
TaskExecutionExceptionon:- Database connection failures
- SQL syntax errors
- Constraint violations
- Parameter binding errors
- All JDBC resources are automatically closed
- If any statement in batch fails, entire batch may be rolled back (depends on database and transaction settings)
- Much faster than individual statements for bulk operations
- Reduces network round-trips to database
- Consider batch size limits (typically 1000-5000 rows per batch)
- May want to split very large datasets into multiple batches
- Some databases have specific batch optimizations
- Use for bulk operations (100+ rows)
- Keep batch sizes reasonable (1000-5000 typical)
- Use transactions to ensure atomicity
- Monitor memory usage with large batches
- Test rollback behavior with your database
- Validate data before batching to avoid partial failures
- Type-safe mapping to POJOs, DTOs, or records
- Custom RowMapper for flexible conversions
- Compile-time type checking
- Better IDE support and refactoring
- Reusable mappers across queries
- All benefits of JdbcQueryTask (parameterization, connection pooling, etc.)
- Inputs:
- SQL query string (direct or from context key)
- Optional List parameters (direct or from context key)
- RowMapper function (direct or from context key)
- Outputs:
- List typed results (at configured outputKey)
- Throws
TaskExecutionExceptionon:- Database connection failures
- SQL syntax errors
- Parameter binding errors
- Mapping errors (exceptions in RowMapper)
- Type conversion errors
- All JDBC resources are automatically closed
- Create reusable RowMapper instances for common entity types
- Use Java records for immutable data objects
- Handle null values appropriately in mapper
- Consider using static factory methods for complex mappers
- Validate data during mapping
- Keep mappers focused on single responsibility
- Row-by-row processing using callbacks
- Memory efficient - only one row in memory at a time
- Configurable fetch size for optimization
- Forward-only cursor for maximum performance
- Early termination support
- Prevents OutOfMemoryError on large result sets
- Tracks total rows processed
- Inputs:
- SQL query string (direct or from context key)
- Optional List parameters (direct or from context key)
- Consumer<Map<String, Object>> callback (direct or from context key)
- Outputs:
- Long row count (at configured rowCountKey)
- Throws
TaskExecutionExceptionon: - Database connection failures
- SQL syntax errors
- Parameter binding errors
- Callback exceptions (processing stops immediately)
- Query timeout
- All JDBC resources are automatically closed
- Processing stops on first callback exception
- Fetch Size: Configures JDBC driver fetch size (default: 1000)
- Larger values: Fewer network round-trips, more memory
- Smaller values: More round-trips, less memory
- Typical range: 100-5000 depending on row size
- Forward-Only Cursor: ResultSet configured for forward-only traversal
- Read-Only Mode: ResultSet in read-only mode for better performance
- Query Timeout: Optional timeout to prevent long-running queries
- Use for queries returning >10,000 rows or large row sizes
- Set appropriate fetch size based on row size and memory
- Keep callback processing fast (avoid heavy I/O or blocking)
- Consider using for ETL operations
- Handle errors gracefully in callbacks
- Use for exports to files or external systems
- Monitor memory usage in production
- Consider batch processing within callback for efficiency
- Stored procedure and function support
- IN, OUT, and INOUT parameter handling
- Return value support for functions
- Multiple result set support
- Parameterized call statements
- Automatic type mapping between JDBC and Java types
- Stored Procedure:
{call procedure_name(?, ?, ?)} - Function:
{? = call function_name(?, ?)} - Package Procedure (Oracle):
{call package.procedure_name(?, ?)} - IN: Map position (1-based) → value
- OUT: Map position (1-based) → SQL type constant from java.sql.Types
- INOUT: Specify in both IN and OUT maps
- Inputs:
- Call statement string (direct or from context key)
- Map<Integer, Object> IN parameters (direct or from context key)
- Map<Integer, Integer> OUT parameter types (direct or from context key)
- Outputs:
- Map<Integer, Object> OUT values (at configured outValuesKey)
- List<List<Map<String, Object>>> result sets (at configured resultSetsKey)
- Throws
TaskExecutionExceptionon: - Database connection failures
- SQL syntax errors
- Stored procedure errors
- Parameter binding errors
- Type conversion errors
- All JDBC resources are automatically closed
- INTEGER, BIGINT, SMALLINT, TINYINT
- DECIMAL, NUMERIC, DOUBLE, FLOAT, REAL
- VARCHAR, CHAR, LONGVARCHAR, CLOB
- DATE, TIME, TIMESTAMP
- BOOLEAN, BIT
- BINARY, VARBINARY, BLOB
- ARRAY, STRUCT
- Use stored procedures for complex business logic in database
- Document expected parameter positions and types
- Handle OUT parameter types correctly based on database
- Test stored procedures independently before workflow integration
- Use transactions when calling multiple procedures
- Consider error handling within stored procedures
- Use meaningful OUT parameter names in context
- Transaction management with automatic commit/rollback
- Multiple tasks execute within single transaction
- Connection sharing across all nested tasks
- Configurable transaction isolation level
- Auto-commit disabled during transaction
- Automatic rollback on any task failure
- Thread-safe with proper DataSource
Connection.TRANSACTION_READ_UNCOMMITTED- Lowest isolation, allows dirty readsConnection.TRANSACTION_READ_COMMITTED- Prevents dirty reads (default for most DBs)Connection.TRANSACTION_REPEATABLE_READ- Prevents dirty and non-repeatable readsConnection.TRANSACTION_SERIALIZABLE- Highest isolation, fully isolated- Default: Database default isolation level
"_jdbc_transaction_connection"- Internal key used to share connection across tasks- Nested tasks should use this connection if they need database access
- Inputs:
- List tasks (direct or from context key)
- All nested tasks read from shared context
- Outputs:
- Nested tasks write to context normally
- Special key
"_jdbc_transaction_connection"used internally - Acquires connection from DataSource
- Disables auto-commit
- Sets configured isolation level (if specified)
- Stores connection in context for nested tasks
- Executes all tasks sequentially
- On success: Commits transaction
- On failure: Rolls back transaction and throws exception
- Always: Removes connection from context and closes it
- Throws
TaskExecutionExceptionon: - Connection acquisition failure
- Any nested task failure (triggers rollback)
- Commit failure
- Rollback failure (logged but original exception propagated)
- All JDBC resources are automatically closed
- Transaction always rolled back on any task failure
- Keep transactions short to minimize lock contention
- Use appropriate isolation level for your use case:
- READ_COMMITTED: Good default for most applications
- REPEATABLE_READ: When you need consistent reads
- SERIALIZABLE: For critical financial transactions
- Avoid long-running operations inside transactions
- Be aware of deadlock potential with multiple concurrent transactions
- Use connection pooling (HikariCP recommended)
- Test rollback behavior thoroughly
- Monitor transaction duration in production
- Consider retry logic for transient failures
- Don't mix transaction boundaries (nested tasks should not start new transactions)
- Access tables in consistent order across transactions
- Keep transactions short
- Use appropriate isolation level (lower = less locking)
- Consider optimistic locking strategies
- Monitor for deadlocks in production
- Command execution
- Working directory support
- stdout/stderr capture
- Exit code checking
- Inputs: Command arguments can reference context
- Outputs:
- stdout at specified key
- stderr at specified key
- exit code at specified key
⚠️ Be careful with user input in commands- Validate command arguments
- Consider command injection risks
- Run with minimal privileges
- Configurable delay duration in milliseconds
- Thread blocking behavior
- Respects thread interruption
- Idempotent operation
- Inputs: None
- Outputs: None (no context mutation)
- Throws
TaskExecutionExceptionif thread is interrupted during delay - Re-interrupts the thread to preserve interrupt status
- All resources are cleaned up properly
- Thread Blocking: Blocks the executing thread for entire duration
- Parallel Workflows: Reduces parallelism if used in parallel execution
- No Async Alternative: This is a synchronous blocking delay (no async version currently)
- Use for rate limiting between API calls
- Use for polling intervals with bounded retries
- Keep delays reasonable (typically milliseconds to seconds)
- Consider using in sequential workflows rather than parallel
- Avoid very long delays in parallel execution paths
- Test timeout behavior with delays in critical paths
- Monitor total workflow duration when using multiple delays
- This task is thread-safe
- Multiple threads can safely execute delay tasks concurrently
- Each thread gets its own independent delay
- Multiple log levels: TRACE, DEBUG, INFO, WARN, ERROR
- Configurable logger name
- Optional exception logging
- Message parameter formatting
- Efficient level checking to avoid unnecessary formatting
- Fluent builder API
- Inputs (from context, with fallback to builder values):
- Optional
message(String) - Override builder message - Optional
parameters(Object[]) - Override builder parameters - Optional
throwable(Throwable) - Override builder throwable - Outputs: None (logging only, no context mutation)
- If both logger instance and name are provided: logger instance takes precedence
- Default logger name:
"workflow.log" - Use hierarchical names:
"com.myapp.orders","com.workflow.batch", etc. - Throws
TaskExecutionExceptionon: - Null message (required field)
- Logging framework failures (rare)
- Invalid log levels are caught during task construction
- Supports SLF4J placeholder syntax:
{} - Example:
"User {} logged in at {}" - Parameters are passed separately to SLF4J for efficient formatting
- Null parameters are handled gracefully
- Level Checking: Task checks if level is enabled before formatting messages
- Lazy Formatting: Message formatting only happens if level is enabled
- No Overhead: When level is disabled, minimal CPU impact
- String Allocation: Each logging call allocates log record (normal SLF4J behavior)
- Use TRACE for detailed debugging (enable only when troubleshooting)
- Use DEBUG for diagnostic information during development
- Use INFO for important workflow milestones
- Use WARN for recoverable issues that need attention
- Use ERROR for serious failures
- Use meaningful logger names organized by component
- Include relevant context in log messages
- Use parameters instead of string concatenation for efficiency
- Log at critical workflow steps (start, end, failures, decisions)
- Avoid logging in tight loops or high-frequency tasks
- Use different loggers for different concerns (audit, business logic, debugging)
- This task is thread-safe
- SLF4J itself is thread-safe
- Multiple threads can log concurrently without coordination
- Inputs: None
- Outputs: None
- Single Responsibility: One task, one job
- Idempotent: Same input → same output, no side effects
- Fail Fast: Validate inputs early
- Clear Errors: Descriptive exception messages
- Document Context: What keys are read/written
- Thread Safety: If used in parallel workflows
- Resource Cleanup: Release resources in finally blocks
- Naming: Descriptive task names
- HTTP Operations: GET, POST, PUT, DELETE with full request/response handling
- File Operations: Read and write with encoding support
- Control Flow: Conditional, switch, delay for branching and timing
- Processing: JavaScript and shell command execution
- Resilience: Retry and timeout decorators
- Composition: Sequential and parallel task composition
- Utilities: Testing and placeholder tasks
- Read inputs from WorkflowContext
- Execute business logic
- Write outputs to WorkflowContext
- Throw TaskExecutionException on failure
- Provide descriptive names for logging
Error Handling:
Performance Optimization:
Best Practices:
Executes database stored procedures or functions using JDBC CallableStatement.
Purpose: Invoke database stored procedures and functions with IN/OUT/INOUT parameters
Features:
Builder Configuration:
JdbcCallableTask.builder() .dataSource(DataSource) // JDBC DataSource (required) .call(String) // Call statement directly (optional) .inParameters(Map<Integer, Object>) // IN parameters directly (optional) .outParameters(Map<Integer, Integer>) // OUT parameters directly (optional) .readingCallFrom(String) // Context key for call statement (optional) .readingInParametersFrom(String) // Context key for IN parameters (optional) .readingOutParametersFrom(String) // Context key for OUT parameters (optional) .writingResultSetsTo(String) // Context key for result sets (optional) .writingOutValuesTo(String) // Context key for OUT values (optional) .build()Call Statement Syntax:
Parameter Mapping:
Examples:
public void example() { // Simple stored procedure with IN parameters Map<Integer, Object> inParams = new HashMap<>(); inParams.put(1, 101); // product_id inParams.put(2, 50); // quantity JdbcCallableTask task = JdbcCallableTask.builder() .dataSource(dataSource) .call("{call update_inventory(?, ?)}") .inParameters(inParams) .build(); WorkflowContext context = new WorkflowContext(); task.execute(context); // Function with return value // CREATE FUNCTION calculate_tax(amount DECIMAL) RETURNS DECIMAL Map<Integer, Object> funcInParams = new HashMap<>(); funcInParams.put(2, new BigDecimal("100.00")); // amount (position 2) Map<Integer, Integer> funcOutParams = new HashMap<>(); funcOutParams.put(1, Types.DECIMAL); // Return value at position 1 JdbcCallableTask funcTask = JdbcCallableTask.builder() .dataSource(dataSource) .call("{? = call calculate_tax(?)}") .inParameters(funcInParams) .outParameters(funcOutParams) .writingOutValuesTo("taxResult") .build(); funcTask.execute(context); Map<Integer, Object> outValues = context.get("taxResult"); BigDecimal tax = (BigDecimal) outValues.get(1); System.out.println("Tax: " + tax); // Procedure with - OUT parameters // CREATE PROCEDURE get_user_stats(IN user_id INT, OUT total_orders INT, OUT total_spent DECIMAL) Map<Integer, Object> statsInParams = new HashMap<>(); statsInParams.put(1, 123); // user_id Map<Integer, Integer> statsOutParams = new HashMap<>(); statsOutParams.put(2, Types.INTEGER); // total_orders statsOutParams.put(3, Types.DECIMAL); // total_spent JdbcCallableTask statsTask = JdbcCallableTask.builder() .dataSource(dataSource) .call("{call get_user_stats(?, ?, ?)}") .inParameters(statsInParams) .outParameters(statsOutParams) .writingOutValuesTo("userStats") .build(); statsTask.execute(context); Map<Integer, Object> stats = context.get("userStats"); Integer totalOrders = (Integer) stats.get(2); BigDecimal totalSpent = (BigDecimal) stats.get(3); // INOUT parameters // CREATE PROCEDURE increment_counter(INOUT counter INT) Map<Integer, Object> inoutInParams = new HashMap<>(); inoutInParams.put(1, 10); // Initial value Map<Integer, Integer> inoutOutParams = new HashMap<>(); inoutOutParams.put(1, Types.INTEGER); // Register as OUT JdbcCallableTask inoutTask = JdbcCallableTask.builder() .dataSource(dataSource) .call("{call increment_counter(?)}") .inParameters(inoutInParams) .outParameters(inoutOutParams) .writingOutValuesTo("counterResult") .build(); inoutTask.execute(context); Map<Integer, Object> result = context.get("counterResult"); Integer newValue = (Integer) result.get(1); // Procedure with result sets // CREATE PROCEDURE get_order_details(IN order_id INT) // Returns multiple result sets Map<Integer, Object> orderParams = new HashMap<>(); orderParams.put(1, 456); JdbcCallableTask resultSetTask = JdbcCallableTask.builder() .dataSource(dataSource) .call("{call get_order_details(?)}") .inParameters(orderParams) .writingResultSetsTo("orderDetails") .build(); resultSetTask.execute(context); // Result sets are List<List<Map<String, Object>>> List<List<Map<String, Object>>> resultSets = context.get("orderDetails"); // First result set: order header List<Map<String, Object>> orderHeader = resultSets.get(0); Map<String, Object> order = orderHeader.getFirst(); // Second result set: order items List<Map<String, Object>> orderItems = resultSets.get(1); // Context Mode - Dynamic procedure calls context.put("procCall", "{call process_payment(?, ?, ?)}"); Map<Integer, Object> paymentParams = new HashMap<>(); paymentParams.put(1, 789); // user_id paymentParams.put(2, new BigDecimal("49.99")); // amount paymentParams.put(3, "USD"); // currency context.put("paymentParams", paymentParams); Map<Integer, Integer> paymentOutParams = new HashMap<>(); paymentOutParams.put(4, Types.VARCHAR); // transaction_id context.put("paymentOutParams", paymentOutParams); JdbcCallableTask contextTask = JdbcCallableTask.builder() .dataSource(dataSource) .readingCallFrom("procCall") .readingInParametersFrom("paymentParams") .readingOutParametersFrom("paymentOutParams") .writingOutValuesTo("paymentResult") .build(); contextTask.execute(context); } // Complex workflow with stored procedures public Workflow buildOrderProcessingWorkflow(DataSource dataSource) { // Validate order procedure JdbcCallableTask validateTask = JdbcCallableTask.builder() .dataSource(dataSource) .readingCallFrom("validateCall") .readingInParametersFrom("validateParams") .readingOutParametersFrom("validateOut") .writingOutValuesTo("validationResult") .build(); // Process order procedure JdbcCallableTask processTask = JdbcCallableTask.builder() .dataSource(dataSource) .readingCallFrom("processCall") .readingInParametersFrom("processParams") .writingOutValuesTo("processResult") .build(); return SequentialWorkflow.builder() .name("ProcessOrder") .task(context -> { // Prepare validation Integer orderId = context.getTyped("orderId", Integer.class); context.put("validateCall", "{call validate_order(?, ?)}"); Map<Integer, Object> validateParams = new HashMap<>(); validateParams.put(1, orderId); context.put("validateParams", validateParams); Map<Integer, Integer> validateOut = new HashMap<>(); validateOut.put(2, Types.BOOLEAN); // is_valid context.put("validateOut", validateOut); }) .task(validateTask) .task(context -> { Map<Integer, Object> validationResult = context.get("validationResult"); Boolean isValid = (Boolean) validationResult.get(2); if (!isValid) { throw new TaskExecutionException("Order validation failed"); } // Prepare processing Integer orderId = context.getTyped("orderId", Integer.class); context.put("processCall", "{call process_order(?)}"); Map<Integer, Object> processParams = new HashMap<>(); processParams.put(1, orderId); context.put("processParams", processParams); }) .task(processTask) .build(); }
Context Usage:
Error Handling:
SQL Type Constants (java.sql.Types):
Best Practices:
Executes multiple tasks within a single database transaction with automatic commit or rollback.
Purpose: Ensures ACID properties for multiple database operations - all succeed or all rollback
Features:
Builder Configuration:
JdbcTransactionTask.builder() .dataSource(DataSource) // JDBC DataSource (required) .task(Task) // Add task to transaction (multiple) .readingTasksFrom(String) // Context key for task list (optional) .isolationLevel(int) // Transaction isolation level (optional) .build()Isolation Levels (Connection constants):
Special Context Key:
Context Usage:
Transaction Semantics:
Error Handling:
Best Practices:
Deadlock Prevention:
Executes system shell commands.
Purpose: System integration, external tools
Features:
Builder Configuration:
ShellCommandTask.builder() .command(String...) // Command and arguments .workingDirectory(Path) // Working directory .stdoutContextKey(String) // Key for stdout .stderrContextKey(String) // Key for stderr .exitCodeContextKey(String) // Key for exit code .build()Examples:
// Run script ShellCommandTask task = ShellCommandTask.builder() .command("python3", "process.py", "input.json") .workingDirectory(Path.of("/opt/scripts")) .stdoutContextKey("scriptOutput") .stderrContextKey("scriptErrors") .build(); // Git operations ShellCommandTask gitClone = ShellCommandTask.builder() .command("git", "clone", repoUrl) .workingDirectory(Path.of("/tmp")) .build(); // Data processing ShellCommandTask csvProcess = ShellCommandTask.builder() .command("awk", "-F,", "{print $1,$3}", "data.csv") .stdoutContextKey("processedCsv") .build();
Context Usage:
Security Considerations:
Introduces a deliberate pause in workflow execution.
Purpose: Rate limiting, pacing requests, waiting for processes, testing timeout behaviors
Features:
Constructor:
DelayTask(long millis);
Examples:
// Simple delay Task delayTask = new DelayTask(2000); // 2 second delay WorkflowContext context = new WorkflowContext(); delayTask.execute(context); // Rate limiting in sequential workflow SequentialWorkflow rateLimitedPipeline = SequentialWorkflow.builder() .name("RateLimitedApi") .task(new GetHttpTask.Builder<>(client) .url("https://api.example.com/request1") .build()) .task(new DelayTask(1000)) // 1 second rate limiting .task(new GetHttpTask.Builder<>(client) .url("https://api.example.com/request2") .build()) .task(new DelayTask(1000)) // 1 second rate limiting .task(new GetHttpTask.Builder<>(client) .url("https://api.example.com/request3") .build()) .build(); WorkflowResult result = rateLimitedPipeline.execute(context); // Retry backoff pattern public Workflow buildRetryWorkflow(Workflow action) { return SequentialWorkflow.builder() .task(action) .task(new DelayTask(5000)) // 5 second backoff .task(action) .task(new DelayTask(10000)) // 10 second backoff .task(action) .build(); } // Polling with intervals public Workflow buildPollingWorkflow() { return SequentialWorkflow.builder() .name("PollingWorkflow") .task(context -> checkExternalStatus(context)) // Check status .task(new DelayTask(2000)) // Wait 2 seconds .task(context -> checkExternalStatus(context)) // Check again .task(new DelayTask(5000)) // Wait longer .task(context -> checkExternalStatus(context)) // Final check .build(); }
Context Usage:
Error Handling:
Performance Implications:
Best Practices:
Thread Safety:
Logs messages to SLF4J at configurable log levels.
Purpose: Instrument workflows with logging for monitoring and debugging
Features:
Log Levels:
LogTask.LogLevel.TRACE - Finest-grained informational messages LogTask.LogLevel.DEBUG - Detailed diagnostic information LogTask.LogLevel.INFO - General informational messages (default) LogTask.LogLevel.WARN - Warning messages for potentially harmful situations LogTask.LogLevel.ERROR - Error messages with optional exceptionBuilder Configuration:
LogTask.builder() .message(String) // Log message template (required) .parameters(Object...) // Message parameters (optional) .level(LogLevel) // Log level (default: INFO) .throwable(Throwable) // Exception to log (optional) .loggerName(String) // Logger name (default: "workflow.log") .logger(Logger) // Pre-configured SLF4J Logger (optional) .build()Examples:
// Basic informational logging LogTask infoLog = LogTask.builder() .message("Workflow started") .level(LogTask.LogLevel.INFO) .build(); WorkflowContext context = new WorkflowContext(); infoLog.execute(context); // Debug logging with parameters LogTask debugLog = LogTask.builder() .message("Processing user {} with order {}") .parameters("user-123", "order-456") .level(LogTask.LogLevel.DEBUG) .loggerName("com.myapp.orders") .build(); debugLog.execute(context); // Error logging with exception Exception error = new RuntimeException("Database connection failed"); LogTask errorLog = LogTask.builder() .message("Failed to load user data") .level(LogTask.LogLevel.ERROR) .throwable(error) .build(); errorLog.execute(context); // Dynamic message from context public Workflow buildLoggingWorkflow() { return SequentialWorkflow.builder() .name("DataProcessingWithLogging") .task(context -> { // Setup context.put("batchSize", 100); context.put("processingLevel", LogTask.LogLevel.DEBUG); }) .task(LogTask.builder() .message("Starting batch processing") .level(LogTask.LogLevel.INFO) .loggerName("com.workflow.batch") .build()) .task(context -> { // Processing int processed = 0; context.put("processedCount", processed); }) .task(LogTask.builder() .message("Processed {} items") .parameters(100) .level(LogTask.LogLevel.INFO) .build()) .build(); } // Audit logging LogTask auditLog = LogTask.builder() .message("User {} accessed resource {} at {}") .parameters("admin", "financial_report", System.currentTimeMillis()) .level(LogTask.LogLevel.WARN) .loggerName("com.myapp.audit") .build(); // Conditional logging in workflow public Workflow buildConditionalLoggingWorkflow() { return SequentialWorkflow.builder() .task(context -> { Integer errorCount = context.getTyped("errorCount", Integer.class); if (errorCount != null && errorCount > 10) { LogTask alertLog = LogTask.builder() .message("High error rate detected: {} errors") .parameters(errorCount) .level(LogTask.LogLevel.ERROR) .loggerName("com.workflow.alerts") .build(); alertLog.execute(context); } }) .build(); } // Trace logging for detailed debugging LogTask traceLog = LogTask.builder() .message("Variable state: key={}, value={}, type={}") .parameters("userId", "user-123", "String") .level(LogTask.LogLevel.TRACE) .loggerName("com.myapp.debug") .build();
Context Usage:
Logger Name Hierarchy:
Error Handling:
Message Formatting:
Performance Considerations:
Best Practices:
Thread Safety:
Does nothing - useful for testing and placeholders.
Purpose: Testing, placeholders
Examples:
public void example() { // Placeholder in development Task placeholder = new NoOpTask(); // Testing workflow structure SequentialWorkflow.builder() .task(new NoOpTask()) .task(new NoOpTask()) .task(new NoOpTask()) .build(); // Conditional default Task defaultBranch = new NoOpTask(); }
Context Usage:
Simple Task Template:
public class MyTask extends AbstractTask { private final String config; public MyTask(String config) { this.config = config; } @Override protected void doExecute(WorkflowContext context) throws TaskExecutionException { // 1. Read inputs from context String input = context.getTyped("inputKey", String.class); // 2. Validate inputs if (input == null || input.isEmpty()) { throw new TaskExecutionException("Input is required"); } // 3. Execute business logic String result = processData(input, config); // 4. Write outputs to context context.put("outputKey", result); } @Override public String getName() { return "MyTask"; } private String processData(String input, String config) { // Implementation return input.toUpperCase(); } }
public class SlackNotificationTask extends AbstractTask { private final String webhookUrl; private final HttpClient httpClient; public SlackNotificationTask(String webhookUrl, HttpClient httpClient) { this.webhookUrl = webhookUrl; this.httpClient = httpClient; } @Override protected void doExecute(WorkflowContext context) throws TaskExecutionException { // Read message from context String message = context.getTyped("notificationMessage", String.class); if (message == null) { throw new TaskExecutionException("Message is required"); } // Build Slack payload String payload = String.format("{\"text\":\"%s\"}", message); // Send notification try { HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(webhookUrl)) .header("Content-Type", "application/json") .POST(HttpRequest.BodyPublishers.ofString(payload)) .build(); HttpResponse<String> response = httpClient.send( request, HttpResponse.BodyHandlers.ofString() ); if (response.statusCode() != 200) { throw new TaskExecutionException( "Slack notification failed: " + response.body() ); } // Store response context.put("notificationSent", true); context.put("notificationTimestamp", Instant.now()); } catch (IOException | InterruptedException e) { throw new TaskExecutionException("Failed to send notification", e); } } @Override public String getName() { return "SlackNotification"; } }
public void example() { SequentialWorkflow.builder() .task(task1) .task(task2) .task(task3) .build(); }
public void example() { ParallelWorkflow.builder() .task(task1) .task(task2) .task(task3) .build(); }
The Workflow Engine provides a comprehensive set of tasks covering:
All tasks follow consistent patterns:
Tasks are composable, testable, and designed for both simple and complex workflow scenarios.
Error Handling:
Best Practices:
Executes SQL SELECT queries and processes results in a streaming fashion without loading all rows into memory.
Purpose: Efficiently process large database result sets that don't fit in memory
Features:
Builder Configuration:
JdbcStreamingQueryTask.builder() .dataSource(DataSource) // JDBC DataSource (required) .sql(String) // SQL query directly (optional) .params(List<Object>) // Parameters directly (optional) .rowCallback(Consumer<Map<String, Object>>) // Callback directly (optional) .readingSqlFrom(String) // Context key for SQL (optional) .readingParamsFrom(String) // Context key for parameters (optional) .readingRowCallbackFrom(String) // Context key for callback (optional) .writingRowCountTo(String) // Context key for row count (required) .fetchSize(int) // Fetch size optimization (default: 1000) .queryTimeout(int) // Query timeout in seconds (optional) .build()Examples:
public void example() { // Direct Mode - Print each row Consumer<Map<String, Object>> printCallback = row -> { Integer id = (Integer) row.get("id"); String name = (String) row.get("name"); System.out.println("User ID: " + id + ", Name: " + name); }; JdbcStreamingQueryTask task = JdbcStreamingQueryTask.builder() .dataSource(dataSource) .sql("SELECT id, name, email FROM users WHERE active = ?") .params(List.of(true)) .rowCallback(printCallback) .writingRowCountTo("processedCount") .fetchSize(500) .build(); WorkflowContext context = new WorkflowContext(); task.execute(context); Long totalProcessed = context.get("processedCount"); System.out.println("Processed " + totalProcessed + " rows"); // Context Mode - Dynamic callback context.put("querySql", "SELECT * FROM orders WHERE created_at >= ?"); context.put("queryParams", List.of(LocalDate.now().minusDays(30))); Consumer<Map<String, Object>> orderCallback = row -> { // Process each order Integer orderId = (Integer) row.get("id"); BigDecimal total = (BigDecimal) row.get("total"); processOrder(orderId, total); }; context.put("orderCallback", orderCallback); JdbcStreamingQueryTask contextTask = JdbcStreamingQueryTask.builder() .dataSource(dataSource) .readingSqlFrom("querySql") .readingParamsFrom("queryParams") .readingRowCallbackFrom("orderCallback") .writingRowCountTo("ordersProcessed") .fetchSize(1000) .build(); contextTask.execute(context); // Export to CSV try (CSVWriter writer = new CSVWriter(new FileWriter("export.csv"))) { // Write header writer.writeNext(new String[]{"id", "name", "email", "created_at"}); Consumer<Map<String, Object>> csvCallback = row -> { String[] values = { String.valueOf(row.get("id")), String.valueOf(row.get("name")), String.valueOf(row.get("email")), String.valueOf(row.get("created_at")) }; writer.writeNext(values); }; JdbcStreamingQueryTask exportTask = JdbcStreamingQueryTask.builder() .dataSource(dataSource) .sql("SELECT id, name, email, created_at FROM users") .params(Collections.emptyList()) .rowCallback(csvCallback) .writingRowCountTo("exportedRows") .fetchSize(1000) .build(); exportTask.execute(context); } // Aggregate processing with state AtomicInteger activeCount = new AtomicInteger(0); AtomicLong totalRevenue = new AtomicLong(0); Consumer<Map<String, Object>> aggregateCallback = row -> { String status = (String) row.get("status"); BigDecimal amount = (BigDecimal) row.get("amount"); if ("ACTIVE".equals(status)) { activeCount.incrementAndGet(); } totalRevenue.addAndGet(amount.longValue()); }; JdbcStreamingQueryTask aggregateTask = JdbcStreamingQueryTask.builder() .dataSource(dataSource) .sql("SELECT status, amount FROM orders") .params(Collections.emptyList()) .rowCallback(aggregateCallback) .writingRowCountTo("totalOrders") .fetchSize(2000) .build(); aggregateTask.execute(context); System.out.println("Active: " + activeCount.get()); System.out.println("Total Revenue: " + totalRevenue.get()); // Early termination - Stop after finding target AtomicBoolean found = new AtomicBoolean(false); Consumer<Map<String, Object>> findCallback = row -> { if (found.get()) { return; // Skip processing if already found } String email = (String) row.get("email"); if ("target@example.com".equals(email)) { found.set(true); context.put("targetUser", row); throw new RuntimeException("Found target user"); // Stop processing } }; try { JdbcStreamingQueryTask findTask = JdbcStreamingQueryTask.builder() .dataSource(dataSource) .sql("SELECT * FROM users ORDER BY created_at") .params(Collections.emptyList()) .rowCallback(findCallback) .writingRowCountTo("scannedRows") .build(); findTask.execute(context); } catch (TaskExecutionException e) { if (found.get()) { System.out.println("Found target user"); } else { throw e; } } // Batch processing within stream List<Map<String, Object>> batch = new ArrayList<>(); int batchSize = 100; Consumer<Map<String, Object>> batchCallback = row -> { batch.add(row); if (batch.size() >= batchSize) { processBatch(batch); batch.clear(); } }; JdbcStreamingQueryTask batchTask = JdbcStreamingQueryTask.builder() .dataSource(dataSource) .sql("SELECT * FROM events WHERE date >= ?") .params(List.of(LocalDate.now().minusDays(7))) .rowCallback(batchCallback) .writingRowCountTo("eventsProcessed") .fetchSize(1000) .queryTimeout(300) .build(); batchTask.execute(context); // Process remaining items in batch if (!batch.isEmpty()) { processBatch(batch); batch.clear(); } }
Context Usage:
Error Handling:
Performance Considerations:
Best Practices:
Executes SQL SELECT queries with type-safe result mapping using a custom row mapper.
Purpose: Provides type-safe database query results by converting rows into domain objects
Features:
Builder Configuration:
JdbcTypedQueryTask.<T>builder() .dataSource(DataSource) // JDBC DataSource (required) .sql(String) // SQL query directly (optional) .params(List<Object>) // Parameters directly (optional) .rowMapper(RowMapper<T>) // Row mapper directly (optional) .readingSqlFrom(String) // Context key for SQL (optional) .readingParamsFrom(String) // Context key for parameters (optional) .readingRowMapperFrom(String) // Context key for row mapper (optional) .writingResultsTo(String) // Context key for results (required) .build()RowMapper Interface:
@FunctionalInterface public interface RowMapper<T> { T mapRow(ResultSet rs, int rowNum) throws SQLException; }
Examples:
public record User(Integer id, String name, String email, LocalDate createdAt) {} public record Order(Integer id, String orderNumber, User user, BigDecimal total) { } public void example() { // Direct Mode - Map to POJO RowMapper<User> userMapper = (rs, rowNum) -> new User( rs.getInt("id"), rs.getString("name"), rs.getString("email"), rs.getDate("created_at").toLocalDate() ); JdbcTypedQueryTask<User> task = JdbcTypedQueryTask.<User>builder() .dataSource(dataSource) .sql("SELECT id, name, email, created_at FROM users WHERE active = ?") .params(List.of(true)) .rowMapper(userMapper) .writingResultsTo("users") .build(); WorkflowContext context = new WorkflowContext(); task.execute(context); List<User> users = context.get("users"); for (User user : users) { System.out.println(user.name() + " - " + user.email()); } // Context Mode - Dynamic query context.put("userQuery", "SELECT id, name, email, created_at FROM users WHERE status = ?"); context.put("queryParams", List.of("ACTIVE")); context.put("mapper", userMapper); JdbcTypedQueryTask<User> contextTask = JdbcTypedQueryTask.<User>builder() .dataSource(dataSource) .readingSqlFrom("userQuery") .readingParamsFrom("queryParams") .readingRowMapperFrom("mapper") .writingResultsTo("activeUsers") .build(); task.execute(context); List<User> activeUsers = context.get("activeUsers"); // Simple scalar mapping RowMapper<String> emailMapper = (rs, rowNum) -> rs.getString("email"); JdbcTypedQueryTask<String> emailTask = JdbcTypedQueryTask.<String>builder() .dataSource(dataSource) .sql("SELECT email FROM users WHERE department = ?") .params(List.of("Engineering")) .rowMapper(emailMapper) .writingResultsTo("emails") .build(); emailTask.execute(context); List<String> emails = context.get("emails"); RowMapper<Order> orderMapper = (rs, rowNum) -> { User user = new User( rs.getInt("user_id"), rs.getString("user_name"), rs.getString("user_email"), rs.getDate("user_created").toLocalDate() ); return new Order( rs.getInt("order_id"), rs.getString("order_number"), user, rs.getBigDecimal("total") ); }; JdbcTypedQueryTask<Order> orderTask = JdbcTypedQueryTask.<Order>builder() .dataSource(dataSource) .sql("SELECT o.id as order_id, o.order_number, o.total, " + "u.id as user_id, u.name as user_name, u.email as user_email, u.created_at as user_created " + "FROM orders o JOIN users u ON o.user_id = u.id WHERE o.status = ?") .params(List.of("COMPLETED")) .rowMapper(orderMapper) .writingResultsTo("completedOrders") .build(); } // In a workflow public Workflow buildUserProcessingWorkflow(DataSource dataSource) { RowMapper<User> mapper = (rs, rowNum) -> new User( rs.getInt("id"), rs.getString("name"), rs.getString("email"), rs.getDate("created_at").toLocalDate() ); JdbcTypedQueryTask<User> queryTask = JdbcTypedQueryTask.<User>builder() .dataSource(dataSource) .sql("SELECT * FROM users WHERE status = ?") .params(List.of("PENDING")) .rowMapper(mapper) .writingResultsTo("pendingUsers") .build(); return SequentialWorkflow.builder() .name("ProcessUsers") .task(queryTask) .task(context -> { List<User> users = context.get("pendingUsers"); for (User user : users) { processUser(user); } }) .build(); }
Context Usage:
Error Handling:
Best Practices:
Executes multiple SQL statements in a batch for improved performance.
Purpose: Bulk insert, update, or delete operations
Features:
Builder Configuration:
JdbcBatchUpdateTask.builder() .dataSource(DataSource) // JDBC DataSource (required) .readingSqlFrom(String) // Context key for SQL template (required) .readingBatchParamsFrom(String) // Context key for List<List<Object>> (required) .writingBatchResultsTo(String) // Context key for int[] results (required) .build()Constructor:
JdbcBatchUpdateTask( DataSource dataSource, String sqlKey, String batchParamsKey, String outputKey );
Examples:
// Batch insert public void batchInsertLogs() { WorkflowContext context = new WorkflowContext(); context.put("batchSql", "INSERT INTO logs (level, message, timestamp) VALUES (?, ?, ?)"); List<List<Object>> batchData = Arrays.asList( Arrays.asList("INFO", "Application started", Timestamp.valueOf(LocalDateTime.now())), Arrays.asList("DEBUG", "Processing request", Timestamp.valueOf(LocalDateTime.now())), Arrays.asList("ERROR", "Connection failed", Timestamp.valueOf(LocalDateTime.now())) ); context.put("batchData", batchData); JdbcBatchUpdateTask task = JdbcBatchUpdateTask.builder() .dataSource(dataSource) .readingSqlFrom("batchSql") .readingBatchParamsFrom("batchData") .writingBatchResultsTo("batchResults") .build(); task.execute(context); int[] results = (int[]) context.get("batchResults"); int totalInserted = Arrays.stream(results).sum(); System.out.println("Inserted " + totalInserted + " log entries"); } // Batch update with loop public void batchUpdatePrices() { List<Product> products = getProductsToUpdate(); List<List<Object>> updates = new ArrayList<>(); for (Product p : products) { updates.add(Arrays.asList(p.getNewPrice(), p.getId())); } context.put("updateSql", "UPDATE products SET price = ? WHERE id = ?"); context.put("priceUpdates", updates); JdbcBatchUpdateTask task = new JdbcBatchUpdateTask( dataSource, "updateSql", "priceUpdates", "updateResults" ); task.execute(context); int[] counts = context.get("updateResults"); } // Efficient bulk data loading public void bulkLoadUsers(List<User> users) { context.put("insertSql", "INSERT INTO users (name, email, status, created_at) VALUES (?, ?, ?, ?)"); List<List<Object>> userParams = users.stream() .map(u -> Arrays.asList( u.getName(), u.getEmail(), u.getStatus(), Timestamp.valueOf(LocalDateTime.now()) )) .collect(Collectors.toList()); context.put("userBatch", userParams); JdbcBatchUpdateTask task = JdbcBatchUpdateTask.builder() .dataSource(dataSource) .readingSqlFrom("insertSql") .readingBatchParamsFrom("userBatch") .writingBatchResultsTo("loadResults") .build(); } // In a data migration workflow public Workflow buildMigrationWorkflow(DataSource sourceDs, DataSource targetDs) { JdbcQueryTask extractTask = JdbcQueryTask.builder() .dataSource(sourceDs) .readingSqlFrom("extractSql") .readingParamsFrom("extractParams") .writingResultsTo("sourceData") .build(); JdbcBatchUpdateTask loadTask = JdbcBatchUpdateTask.builder() .dataSource(targetDs) .readingSqlFrom("loadSql") .readingBatchParamsFrom("transformedData") .writingBatchResultsTo("loadResults") .build(); return SequentialWorkflow.builder() .name("DataMigration") .task(context -> { context.put("extractSql", "SELECT * FROM legacy_users WHERE migrated = false"); context.put("extractParams", Collections.emptyList()); }) .task(extractTask) .task(context -> { // Transform data List<Map<String, Object>> sourceData = context.get("sourceData"); List<List<Object>> transformed = sourceData.stream() .map(this::transformRow) .collect(Collectors.toList()); context.put("loadSql", "INSERT INTO users (name, email, status) VALUES (?, ?, ?)"); context.put("transformedData", transformed); }) .task(loadTask) .task(context -> { int[] results = context.get("loadResults"); System.out.println("Migrated " + Arrays.stream(results).sum() + " users"); }) .build(); } // Handle empty batch public void handleEmptyBatch() { context.put("sql", "INSERT INTO items (name) VALUES (?)"); context.put("items", Collections.emptyList()); // Empty batch JdbcBatchUpdateTask task = JdbcBatchUpdateTask.builder() .dataSource(dataSource) .readingSqlFrom("sql") .readingBatchParamsFrom("items") .writingBatchResultsTo("results") .build(); task.execute(context); int[] results = context.get("results"); // Will be empty array: int[0] }
Context Usage:
Error Handling:
Best Practices:
Executes SQL INSERT, UPDATE, or DELETE statements and returns the number of affected rows.
Purpose: Modify database data (insert, update, delete)
Features:
Builder Configuration:
JdbcUpdateTask.builder() .dataSource(DataSource) // JDBC DataSource (required) .readingSqlFrom(String) // Context key for SQL statement (required) .readingParamsFrom(String) // Context key for parameters (required) .writingRowsAffectedTo(String) // Context key for row count (required) .build()Constructor:
JdbcUpdateTask( DataSource dataSource, String sqlKey, String paramsKey, String outputKey );
Examples:
// Update operation public void updateUser() { WorkflowContext context = new WorkflowContext(); context.put("updateSql", "UPDATE users SET status = ?, updated_at = ? WHERE id = ?"); context.put("updateParams", Arrays.asList("INACTIVE", Timestamp.valueOf(LocalDateTime.now()), 101)); JdbcUpdateTask task = JdbcUpdateTask.builder() .dataSource(dataSource) .readingSqlFrom("updateSql") .readingParamsFrom("updateParams") .writingRowsAffectedTo("rowsUpdated") .build(); task.execute(context); Integer rowsAffected = (Integer) context.get("rowsUpdated"); if (rowsAffected > 0) { System.out.println("User updated successfully"); } } // Insert operation public void insertUser() { context.put("insertSql", "INSERT INTO users (name, email, status, created_at) VALUES (?, ?, ?, ?)"); context.put("insertParams", Arrays.asList( "John Doe", "john@example.com", "ACTIVE", Timestamp.valueOf(LocalDateTime.now()) )); JdbcUpdateTask task = new JdbcUpdateTask( dataSource, "insertSql", "insertParams", "rowsInserted" ); task.execute(context); Integer inserted = context.get("rowsInserted"); } // Delete operation public void deleteOldRecords() { context.put("deleteSql", "DELETE FROM logs WHERE created_at < ?"); context.put("deleteParams", List.of(LocalDate.now().minusDays(90))); JdbcUpdateTask task = JdbcUpdateTask.builder() .dataSource(dataSource) .readingSqlFrom("deleteSql") .readingParamsFrom("deleteParams") .writingRowsAffectedTo("rowsDeleted") .build(); } // Conditional update in workflow public Workflow buildUserActivationWorkflow(DataSource dataSource) { JdbcUpdateTask updateTask = JdbcUpdateTask.builder() .dataSource(dataSource) .readingSqlFrom("activationSql") .readingParamsFrom("activationParams") .writingRowsAffectedTo("updateCount") .build(); return SequentialWorkflow.builder() .name("ActivateUser") .task(context -> { // Validate and prepare Integer userId = context.getTyped("userId", Integer.class); context.put("activationSql", "UPDATE users SET status = 'ACTIVE' WHERE id = ?"); context.put("activationParams", List.of(userId)); }) .task(updateTask) .task(context -> { Integer count = context.get("updateCount"); if (count == 0) { throw new RuntimeException("User not found"); } // Send activation email... }) .build(); }
Context Usage: