Skip to content

Workflow Agent Recipe

Build intelligent workflow automation agents with OpenDXA


Overview

This recipe demonstrates how to build sophisticated workflow automation agents using OpenDXA. Learn to create agents that can orchestrate complex business processes, make intelligent decisions, and adapt to changing conditions.

What You'll Build

A comprehensive workflow automation system that can: - Orchestrate multi-step business processes - Make intelligent decisions at workflow branches - Handle errors and exceptions gracefully - Adapt workflows based on real-time conditions - Monitor and report on workflow performance

Quick Start

Basic Workflow Agent

# Configure workflow agent
llm = create_llm_resource(provider="openai", model="gpt-4")
workflow_memory = create_memory_resource()

# Simple workflow definition
def create_order_workflow(order_data):
 """Basic order processing workflow."""

 # Step 1: Validate order
 validation_result = reason(f"""
 Validate this order data:
 {order_data}

 Check:
 - Required fields are present
 - Data formats are correct
 - Business rules are satisfied

 Return validation status and any issues.
 """)

 if not validation_result["valid"]:
 return {
 "status": "failed",
 "step": "validation",
 "errors": validation_result["issues"]
 }

 # Step 2: Check inventory
 inventory_check = check_inventory(order_data["items"])

 if not inventory_check["available"]:
 return {
 "status": "failed",
 "step": "inventory_check",
 "message": "Insufficient inventory",
 "shortfall": inventory_check["shortfall"]
 }

 # Step 3: Process payment
 payment_result = process_payment(order_data["payment_info"])

 if not payment_result["success"]:
 return {
 "status": "failed",
 "step": "payment_processing",
 "error": payment_result["error"]
 }

 # Step 4: Create order record
 order_id = create_order_record(order_data, payment_result["transaction_id"])

 # Step 5: Schedule fulfillment
 fulfillment_scheduled = schedule_fulfillment(order_id, order_data["items"])

 # Log workflow completion
 log(f"Order workflow completed successfully: {order_id}", level="INFO")

 return {
 "status": "completed",
 "order_id": order_id,
 "payment_transaction": payment_result["transaction_id"],
 "fulfillment_date": fulfillment_scheduled["estimated_date"]
 }

# Execute workflow
order_data = {
 "customer_id": "12345",
 "items": [{"product_id": "ABC123", "quantity": 2}],
 "payment_info": {"method": "credit_card", "amount": 99.99}
}

result = create_order_workflow(order_data)
log(f"Workflow result: {result}", level="INFO")

Advanced Workflow Engine

# Advanced workflow engine with dynamic routing
class WorkflowEngine:
 def __init__(self):
 self.workflows = {}
 self.active_instances = {}
 self.workflow_memory = create_memory_resource()

 def register_workflow(self, workflow_id, workflow_definition):
 """Register a new workflow definition."""
 self.workflows[workflow_id] = workflow_definition
 log(f"Registered workflow: {workflow_id}", level="INFO")

 def start_workflow(self, workflow_id, input_data, instance_id=None):
 """Start a workflow instance."""

 if workflow_id not in self.workflows:
 raise ValueError(f"Unknown workflow: {workflow_id}")

 instance_id = instance_id or generate_unique_id()

 # Initialize workflow context
 context = {
 "instance_id": instance_id,
 "workflow_id": workflow_id,
 "input_data": input_data,
 "current_step": 0,
 "step_results": [],
 "status": "running",
 "started_at": get_current_time(),
 "variables": {}
 }

 # Store active instance
 self.active_instances[instance_id] = context

 # Execute workflow
 return self.execute_workflow(instance_id)

 def execute_workflow(self, instance_id):
 """Execute workflow steps."""

 context = self.active_instances[instance_id]
 workflow_def = self.workflows[context["workflow_id"]]

 try:
 while context["current_step"] < len(workflow_def["steps"]):
 step = workflow_def["steps"][context["current_step"]]

 # Execute step
 step_result = self.execute_step(step, context)

 # Store result
 context["step_results"].append({
 "step_name": step["name"],
 "result": step_result,
 "timestamp": get_current_time()
 })

 # Check for workflow control
 if step_result.get("action") == "terminate":
 context["status"] = "terminated"
 break
 elif step_result.get("action") == "branch":
 # Handle conditional branching
 next_step = self.determine_branch(step_result, context)
 context["current_step"] = next_step
 continue
 elif step_result.get("action") == "wait":
 # Handle waiting conditions
 context["status"] = "waiting"
 context["wait_condition"] = step_result["wait_condition"]
 break

 # Move to next step
 context["current_step"] += 1

 # Complete workflow if all steps finished
 if context["current_step"] >= len(workflow_def["steps"]):
 context["status"] = "completed"
 context["completed_at"] = get_current_time()

 except Exception as e:
 context["status"] = "failed"
 context["error"] = str(e)
 log(f"Workflow {instance_id} failed: {e}", level="ERROR")

 return context

 def execute_step(self, step, context):
 """Execute individual workflow step."""

 step_type = step["type"]

 if step_type == "decision":
 return self.execute_decision_step(step, context)
 elif step_type == "action":
 return self.execute_action_step(step, context)
 elif step_type == "ai_analysis":
 return self.execute_ai_step(step, context)
 elif step_type == "parallel":
 return self.execute_parallel_step(step, context)
 elif step_type == "loop":
 return self.execute_loop_step(step, context)
 else:
 raise ValueError(f"Unknown step type: {step_type}")

 def execute_decision_step(self, step, context):
 """Execute decision step with AI-powered analysis."""

 decision_prompt = step["decision_prompt"].format(
 context=context,
 variables=context["variables"]
 )

 decision = reason(decision_prompt)

 # Parse decision result
 if "branch" in decision:
 return {
 "action": "branch",
 "branch": decision["branch"],
 "reasoning": decision["reasoning"]
 }
 else:
 return {
 "action": "continue",
 "decision": decision
 }

 def execute_action_step(self, step, context):
 """Execute action step."""

 action_type = step["action_type"]
 parameters = step.get("parameters", {})

 # Substitute variables in parameters
 resolved_params = self.resolve_parameters(parameters, context)

 if action_type == "api_call":
 return self.execute_api_action(resolved_params)
 elif action_type == "database_operation":
 return self.execute_database_action(resolved_params)
 elif action_type == "notification":
 return self.execute_notification_action(resolved_params)
 elif action_type == "file_operation":
 return self.execute_file_action(resolved_params)
 else:
 raise ValueError(f"Unknown action type: {action_type}")

 def execute_ai_step(self, step, context):
 """Execute AI analysis step."""

 analysis_prompt = step["prompt"].format(
 context=context,
 input_data=context["input_data"],
 variables=context["variables"]
 )

 analysis_result = reason(analysis_prompt)

 # Update context variables if specified
 if "output_variables" in step:
 for var_name in step["output_variables"]:
 if var_name in analysis_result:
 context["variables"][var_name] = analysis_result[var_name]

 return {
 "action": "continue",
 "analysis": analysis_result
 }

 def execute_parallel_step(self, step, context):
 """Execute parallel workflow branches."""

 parallel_results = []

 for branch in step["branches"]:
 # Create sub-context for branch
 branch_context = context.copy()
 branch_context["branch_name"] = branch["name"]

 # Execute branch
 branch_result = self.execute_step_sequence(branch["steps"], branch_context)
 parallel_results.append({
 "branch": branch["name"],
 "result": branch_result
 })

 # Merge results based on merge strategy
 merge_strategy = step.get("merge_strategy", "all_complete")
 merged_result = self.merge_parallel_results(parallel_results, merge_strategy)

 return {
 "action": "continue",
 "parallel_results": parallel_results,
 "merged_result": merged_result
 }

 def resolve_parameters(self, parameters, context):
 """Resolve parameter values using context variables."""

 resolved = {}

 for key, value in parameters.items():
 if isinstance(value, str) and value.startswith("${"):
 # Variable substitution
 var_name = value[2:-1] # Remove ${ and }
 if var_name in context["variables"]:
 resolved[key] = context["variables"][var_name]
 else:
 resolved[key] = value # Keep original if variable not found
 else:
 resolved[key] = value

 return resolved

# Create workflow engine instance
workflow_engine = WorkflowEngine()

📋 Implementation Steps

Step 1: Define Workflow Structure

# Workflow definition format
def create_workflow_definition(workflow_config):
 """Create structured workflow definition."""

 return {
 "id": workflow_config["id"],
 "name": workflow_config["name"],
 "description": workflow_config["description"],
 "version": workflow_config.get("version", "1.0"),
 "input_schema": workflow_config["input_schema"],
 "output_schema": workflow_config["output_schema"],
 "steps": workflow_config["steps"],
 "error_handling": workflow_config.get("error_handling", {}),
 "timeout": workflow_config.get("timeout", 3600), # 1 hour default
 "retry_policy": workflow_config.get("retry_policy", {})
 }

# Example: Customer onboarding workflow
customer_onboarding_workflow = create_workflow_definition({
 "id": "customer_onboarding",
 "name": "Customer Onboarding Process",
 "description": "Automated customer onboarding with verification and setup",
 "input_schema": {
 "customer_data": "object",
 "account_type": "string",
 "initial_deposit": "number"
 },
 "output_schema": {
 "customer_id": "string",
 "account_id": "string",
 "onboarding_status": "string"
 },
 "steps": [
 {
 "name": "validate_customer_data",
 "type": "ai_analysis",
 "prompt": """
 Validate customer data for onboarding:
 {input_data}

 Check:
 - Required fields completeness
 - Data format validity
 - Compliance requirements
 - Risk indicators

 Return validation result with any issues found.
 """,
 "output_variables": ["validation_status", "risk_score"]
 },
 {
 "name": "risk_assessment_decision",
 "type": "decision",
 "decision_prompt": """
 Based on risk score {variables[risk_score]}, determine onboarding path:

 - If risk_score < 30: proceed with standard onboarding
 - If 30 <= risk_score < 70: require additional verification
 - If risk_score >= 70: escalate to manual review

 Return the appropriate branch decision.
 """,
 "branches": {
 "standard": 1,
 "verification": 2,
 "manual_review": 3
 }
 },
 {
 "name": "create_customer_account",
 "type": "action",
 "action_type": "api_call",
 "parameters": {
 "endpoint": "/customers",
 "method": "POST",
 "data": "${customer_data}"
 }
 },
 {
 "name": "setup_initial_services",
 "type": "parallel",
 "branches": [
 {
 "name": "create_banking_account",
 "steps": [
 {
 "name": "create_account",
 "type": "action",
 "action_type": "api_call",
 "parameters": {
 "endpoint": "/accounts",
 "method": "POST",
 "data": {
 "customer_id": "${customer_id}",
 "account_type": "${account_type}",
 "initial_deposit": "${initial_deposit}"
 }
 }
 }
 ]
 },
 {
 "name": "setup_digital_access",
 "steps": [
 {
 "name": "create_login_credentials",
 "type": "action",
 "action_type": "api_call",
 "parameters": {
 "endpoint": "/auth/setup",
 "method": "POST",
 "data": {
 "customer_id": "${customer_id}",
 "email": "${customer_data.email}"
 }
 }
 }
 ]
 }
 ],
 "merge_strategy": "all_complete"
 },
 {
 "name": "send_welcome_notification",
 "type": "action",
 "action_type": "notification",
 "parameters": {
 "type": "email",
 "recipient": "${customer_data.email}",
 "template": "welcome_new_customer",
 "data": {
 "customer_name": "${customer_data.name}",
 "account_id": "${account_id}"
 }
 }
 }
 ],
 "error_handling": {
 "retry_failed_steps": True,
 "max_retries": 3,
 "escalation_on_failure": True
 }
})

# Register workflow
workflow_engine.register_workflow(
 "customer_onboarding",
 customer_onboarding_workflow
)

Step 2: Advanced Workflow Controls

# Conditional branching and loops
def execute_conditional_workflow(workflow_config, input_data):
 """Execute workflow with advanced control structures."""

 # Initialize workflow context
 context = {
 "input": input_data,
 "variables": {},
 "loop_counters": {},
 "branch_history": []
 }

 # Execute workflow with control flow
 result = execute_workflow_with_controls(workflow_config, context)
 return result

def execute_workflow_with_controls(workflow_config, context):
 """Execute workflow supporting loops and complex branching."""

 step_index = 0

 while step_index < len(workflow_config["steps"]):
 step = workflow_config["steps"][step_index]

 try:
 if step["type"] == "loop":
 step_index = execute_loop_workflow(step, context, step_index)
 elif step["type"] == "conditional_branch":
 step_index = execute_conditional_branch(step, context, step_index)
 elif step["type"] == "while_loop":
 step_index = execute_while_loop(step, context, step_index)
 else:
 # Regular step execution
 step_result = execute_regular_step(step, context)
 context["variables"].update(step_result.get("variables", {}))
 step_index += 1

 except Exception as e:
 # Handle step execution errors
 error_handled = handle_step_error(step, e, context, workflow_config)
 if not error_handled:
 raise e
 step_index += 1

 return context

def execute_loop_workflow(loop_step, context, current_index):
 """Execute loop workflow structure."""

 loop_config = loop_step["loop_config"]
 loop_variable = loop_config["variable"]
 loop_items = resolve_loop_items(loop_config["items"], context)

 # Initialize loop counter
 loop_id = loop_step["name"]
 context["loop_counters"][loop_id] = 0

 for item in loop_items:
 # Set loop variable
 context["variables"][loop_variable] = item

 # Execute loop body
 for body_step in loop_step["body"]:
 step_result = execute_regular_step(body_step, context)
 context["variables"].update(step_result.get("variables", {}))

 # Increment counter
 context["loop_counters"][loop_id] += 1

 # Check break condition
 if "break_condition" in loop_config:
 break_condition = evaluate_condition(
 loop_config["break_condition"],
 context
 )
 if break_condition:
 break

 # Continue to next step after loop
 return current_index + 1

def execute_conditional_branch(branch_step, context, current_index):
 """Execute conditional branching."""

 # Evaluate condition
 condition = branch_step["condition"]
 condition_result = evaluate_condition(condition, context)

 # Record branch decision
 context["branch_history"].append({
 "step": branch_step["name"],
 "condition": condition,
 "result": condition_result,
 "timestamp": get_current_time()
 })

 # Execute appropriate branch
 if condition_result:
 # Execute true branch
 for step in branch_step["true_branch"]:
 step_result = execute_regular_step(step, context)
 context["variables"].update(step_result.get("variables", {}))
 else:
 # Execute false branch (if exists)
 if "false_branch" in branch_step:
 for step in branch_step["false_branch"]:
 step_result = execute_regular_step(step, context)
 context["variables"].update(step_result.get("variables", {}))

 return current_index + 1

def evaluate_condition(condition, context):
 """Evaluate workflow condition."""

 if condition["type"] == "ai_decision":
 # Use AI to evaluate complex conditions
 decision = reason(f"""
 Evaluate this condition based on current context:

 Condition: {condition["description"]}
 Context variables: {context["variables"]}

 Return true or false with reasoning.
 """)
 return decision["result"]

 elif condition["type"] == "expression":
 # Evaluate mathematical or logical expression
 return evaluate_expression(condition["expression"], context["variables"])

 elif condition["type"] == "comparison":
 # Simple value comparison
 left_value = resolve_value(condition["left"], context)
 right_value = resolve_value(condition["right"], context)
 operator = condition["operator"]

 if operator == "==":
 return left_value == right_value
 elif operator == "!=":
 return left_value != right_value
 elif operator == ">":
 return left_value > right_value
 elif operator == "<":
 return left_value < right_value
 elif operator == ">=":
 return left_value >= right_value
 elif operator == "<=":
 return left_value <= right_value
 elif operator == "in":
 return left_value in right_value
 elif operator == "contains":
 return right_value in left_value

 return False

Step 3: Error Handling and Recovery

# Comprehensive error handling
def implement_workflow_error_handling(workflow_engine):
 """Implement sophisticated error handling for workflows."""

 def handle_workflow_error(instance_id, error, step_context):
 """Handle workflow execution errors intelligently."""

 context = workflow_engine.active_instances[instance_id]
 workflow_def = workflow_engine.workflows[context["workflow_id"]]
 error_policy = workflow_def.get("error_handling", {})

 # Analyze error with AI
 error_analysis = reason(f"""
 Analyze this workflow error and recommend recovery action:

 Error: {error}
 Step context: {step_context}
 Workflow context: {context}
 Available recovery options: {error_policy}

 Consider:
 - Error type and severity
 - Step importance and dependencies
 - Recovery feasibility
 - Business impact

 Recommend: retry, skip, compensate, or escalate
 """)

 recovery_action = error_analysis["recommended_action"]

 if recovery_action == "retry":
 return retry_failed_step(instance_id, step_context, error_policy)
 elif recovery_action == "skip":
 return skip_failed_step(instance_id, step_context)
 elif recovery_action == "compensate":
 return execute_compensation(instance_id, step_context, error_policy)
 elif recovery_action == "escalate":
 return escalate_workflow_error(instance_id, error, step_context)
 else:
 return False # Cannot handle error

 def retry_failed_step(instance_id, step_context, error_policy):
 """Retry failed step with backoff strategy."""

 max_retries = error_policy.get("max_retries", 3)
 retry_delay = error_policy.get("retry_delay", 5)
 backoff_factor = error_policy.get("backoff_factor", 2)

 step_name = step_context["step"]["name"]
 retry_count = context.get("retry_counts", {}).get(step_name, 0)

 if retry_count < max_retries:
 # Increment retry count
 if "retry_counts" not in context:
 context["retry_counts"] = {}
 context["retry_counts"][step_name] = retry_count + 1

 # Calculate delay
 delay = retry_delay * (backoff_factor ** retry_count)

 log(f"Retrying step {step_name} (attempt {retry_count + 1}) after {delay}s", level="INFO")

 # Wait and retry
 wait(delay)
 return True
 else:
 log(f"Max retries exceeded for step {step_name}", level="ERROR")
 return False

 def execute_compensation(instance_id, step_context, error_policy):
 """Execute compensation logic for failed steps."""

 compensation_steps = error_policy.get("compensation_steps", [])

 for comp_step in compensation_steps:
 try:
 # Execute compensation step
 comp_result = workflow_engine.execute_step(comp_step, context)
 log(f"Executed compensation: {comp_step['name']}", level="INFO")
 except Exception as comp_error:
 log(f"Compensation failed: {comp_error}", level="ERROR")
 return False

 return True

 def escalate_workflow_error(instance_id, error, step_context):
 """Escalate workflow error to human operators."""

 escalation_data = {
 "instance_id": instance_id,
 "error": str(error),
 "step_context": step_context,
 "workflow_context": context,
 "timestamp": get_current_time(),
 "priority": determine_escalation_priority(error, context)
 }

 # Send escalation notification
 send_escalation_notification(escalation_data)

 # Mark workflow as requiring human intervention
 context["status"] = "escalated"
 context["escalation_data"] = escalation_data

 return True

 # Attach error handler to workflow engine
 workflow_engine.error_handler = handle_workflow_error

 return workflow_engine

Step 4: Workflow Monitoring and Analytics

# Workflow monitoring and analytics
def implement_workflow_monitoring(workflow_engine):
 """Implement comprehensive workflow monitoring."""

 # Performance metrics tracking
 workflow_metrics = {}

 def track_workflow_metrics(instance_id, event_type, data=None):
 """Track workflow execution metrics."""

 if instance_id not in workflow_metrics:
 workflow_metrics[instance_id] = {
 "events": [],
 "performance": {},
 "errors": []
 }

 metrics = workflow_metrics[instance_id]

 # Record event
 event = {
 "type": event_type,
 "timestamp": get_current_time(),
 "data": data
 }
 metrics["events"].append(event)

 # Update performance metrics
 if event_type == "step_completed":
 step_name = data["step_name"]
 execution_time = data["execution_time"]

 if step_name not in metrics["performance"]:
 metrics["performance"][step_name] = []
 metrics["performance"][step_name].append(execution_time)

 # Track errors
 elif event_type == "step_failed":
 metrics["errors"].append({
 "step": data["step_name"],
 "error": data["error"],
 "timestamp": event["timestamp"]
 })

 def generate_workflow_analytics(time_period="last_24h"):
 """Generate workflow analytics and insights."""

 # Collect metrics for time period
 period_metrics = filter_metrics_by_time(workflow_metrics, time_period)

 # Generate analytics with AI
 analytics = reason(f"""
 Analyze these workflow metrics and generate insights:
 {period_metrics}

 Provide:
 1. Performance summary
 2. Bottleneck identification
 3. Error pattern analysis
 4. Optimization recommendations
 5. Capacity planning insights
 """)

 return analytics

 def monitor_workflow_health():
 """Monitor overall workflow system health."""

 active_workflows = len(workflow_engine.active_instances)
 failed_workflows = len([
 w for w in workflow_engine.active_instances.values()
 if w["status"] == "failed"
 ])

 health_status = {
 "active_workflows": active_workflows,
 "failed_workflows": failed_workflows,
 "success_rate": calculate_success_rate(),
 "average_execution_time": calculate_average_execution_time(),
 "system_load": calculate_system_load()
 }

 # Generate health assessment
 health_analysis = reason(f"""
 Assess workflow system health:
 {health_status}

 Determine:
 - Overall system health (healthy/warning/critical)
 - Performance trends
 - Capacity concerns
 - Recommended actions
 """)

 return {
 "metrics": health_status,
 "analysis": health_analysis,
 "timestamp": get_current_time()
 }

 # Attach monitoring functions
 workflow_engine.track_metrics = track_workflow_metrics
 workflow_engine.generate_analytics = generate_workflow_analytics
 workflow_engine.monitor_health = monitor_workflow_health

 return workflow_engine

Advanced Features

Dynamic Workflow Modification

# Dynamic workflow adaptation
def implement_adaptive_workflows(workflow_engine):
 """Implement workflows that adapt based on execution patterns."""

 def adapt_workflow_based_on_performance(workflow_id, performance_data):
 """Adapt workflow based on performance analysis."""

 adaptation_analysis = reason(f"""
 Analyze workflow performance and suggest optimizations:
 {performance_data}

 Consider:
 - Step execution times
 - Error patterns
 - Resource utilization
 - Success rates

 Suggest specific workflow modifications to improve performance.
 """)

 # Apply suggested modifications
 if "suggested_modifications" in adaptation_analysis:
 for modification in adaptation_analysis["suggested_modifications"]:
 apply_workflow_modification(workflow_id, modification)

 return adaptation_analysis

 def apply_workflow_modification(workflow_id, modification):
 """Apply modification to workflow definition."""

 workflow_def = workflow_engine.workflows[workflow_id]

 if modification["type"] == "add_parallel_execution":
 # Add parallel execution to compatible steps
 add_parallel_execution(workflow_def, modification["steps"])

 elif modification["type"] == "add_caching":
 # Add caching to expensive operations
 add_step_caching(workflow_def, modification["steps"])

 elif modification["type"] == "modify_retry_policy":
 # Adjust retry policies based on failure patterns
 modify_retry_policies(workflow_def, modification["new_policies"])

 elif modification["type"] == "add_early_exit":
 # Add early exit conditions for efficiency
 add_early_exit_conditions(workflow_def, modification["conditions"])

 log(f"Applied modification to {workflow_id}: {modification['type']}", level="INFO")

 # Attach adaptation functions
 workflow_engine.adapt_workflow = adapt_workflow_based_on_performance

 return workflow_engine

Multi-Agent Workflow Coordination

# Multi-agent workflow coordination
def create_multi_agent_workflow(agent_configs, coordination_strategy):
 """Create workflow coordinated across multiple agents."""

 agents = {}

 # Initialize agents
 for agent_id, config in agent_configs.items():
 agents[agent_id] = create_agent(config)

 def execute_coordinated_workflow(workflow_steps, coordination_data):
 """Execute workflow across multiple agents."""

 coordination_results = {}

 for step in workflow_steps:
 assigned_agent = step["assigned_agent"]

 if assigned_agent not in agents:
 raise ValueError(f"Unknown agent: {assigned_agent}")

 # Prepare agent context
 agent_context = {
 "step_data": step["data"],
 "coordination_data": coordination_data,
 "previous_results": coordination_results
 }

 # Execute step on assigned agent
 step_result = agents[assigned_agent].execute_workflow_step(
 step["action"],
 agent_context
 )

 # Store result for coordination
 coordination_results[step["name"]] = step_result

 # Update coordination data
 coordination_data = update_coordination_data(
 coordination_data,
 step_result,
 coordination_strategy
 )

 return coordination_results

 return execute_coordinated_workflow

📊 Testing and Validation

Workflow Testing Framework

# Comprehensive workflow testing
def create_workflow_testing_framework():
 """Create framework for testing complex workflows."""

 def test_workflow_scenario(workflow_id, test_scenario):
 """Test workflow with specific scenario."""

 # Setup test environment
 test_context = setup_test_environment(test_scenario)

 # Execute workflow
 result = workflow_engine.start_workflow(
 workflow_id,
 test_scenario["input_data"],
 instance_id=f"test_{generate_test_id()}"
 )

 # Validate results
 validation = validate_workflow_result(
 result,
 test_scenario["expected_outcome"]
 )

 # Generate test report
 test_report = generate_test_report(
 test_scenario,
 result,
 validation
 )

 return test_report

 def validate_workflow_result(actual_result, expected_outcome):
 """Validate workflow result against expected outcome."""

 validation = reason(f"""
 Validate workflow execution result:

 Actual result: {actual_result}
 Expected outcome: {expected_outcome}

 Check:
 - Workflow completion status
 - Output data correctness
 - Business rule compliance
 - Performance criteria

 Return detailed validation report.
 """)

 return validation

 return {
 "test_scenario": test_workflow_scenario,
 "validate_result": validate_workflow_result
 }

Next Steps

Enhancements

  • Add visual workflow designer
  • Implement workflow versioning
  • Create workflow templates library
  • Add real-time workflow monitoring dashboard
  • Implement workflow performance optimization

Integration Patterns

  • Event-driven workflow triggers
  • External system integrations
  • Human-in-the-loop workflows
  • Workflow composition patterns
  • Microservices orchestration

Ready to automate your workflows? Try the Quick Start example or explore more OpenDXA Recipes.