Design Document: N8n Workflow Management

Overview

This design document outlines the architecture for extending the existing N8n Operator (https://github.com/craigedmunds/n8n-operator) to support declarative workflow management through Kubernetes Custom Resource Definitions (CRDs). The extension will add a new N8nWorkflow CRD and controller logic to the existing operator codebase.

The solution provides Infrastructure as Code capabilities for n8n workflows, enabling version control, GitOps deployment, and automated lifecycle management while maintaining security through Kubernetes-native credential management. This extends the existing operator’s capabilities from managing n8n instances to also managing the workflows within those instances.

Architecture

High-Level Architecture

graph TB
    subgraph "Kubernetes Cluster"
        subgraph "Extended N8n Operator"
            ExistingCRD[N8n CRD - Existing]
            NewCRD[N8nWorkflow CRD - New]
            ExistingController[N8n Controller - Existing]
            WorkflowController[Workflow Controller - New]
            CredMgr[Credential Manager - New]
        end
        
        subgraph "N8n Namespace"
            N8nPod[N8n Instance]
            N8nAPI[N8n API]
        end
        
        subgraph "Secrets"
            K8sSecrets[Kubernetes Secrets]
            ESO[External Secrets Operator]
        end
        
        subgraph "GitOps"
            ArgoCD[ArgoCD]
            Git[Git Repository]
        end
    end
    
    Git --> ArgoCD
    ArgoCD --> ExistingCRD
    ArgoCD --> NewCRD
    ExistingCRD --> ExistingController
    NewCRD --> WorkflowController
    ExistingController --> N8nPod
    WorkflowController --> N8nAPI
    WorkflowController --> CredMgr
    CredMgr --> K8sSecrets
    CredMgr --> N8nAPI
    ESO --> K8sSecrets
    N8nAPI --> N8nPod

Component Interaction Flow

  1. Resource Creation: ArgoCD or kubectl creates N8nWorkflow resources (alongside existing N8n resources)
  2. Controller Watch: New Workflow Controller (added to existing operator) detects resource changes
  3. Validation: Controller validates workflow definition and references to existing n8n instances
  4. Credential Injection: New Credential Manager processes secret references and creates n8n credentials
  5. Workflow Sync: Controller creates/updates workflow in n8n via API
  6. Status Update: Controller updates resource status with sync results

Integration with Existing Operator:

  • The existing N8n controller continues to manage n8n instances (deployment, database, ingress)
  • The new Workflow controller manages workflows within those instances
  • Both controllers can reference the same n8n instances
  • Shared utilities and API clients can be reused between controllers

Components and Interfaces

N8nWorkflow Custom Resource Definition

The N8nWorkflow CRD defines the schema for declarative workflow management:

apiVersion: n8n.slys.dev/v1alpha1
kind: N8nWorkflow
metadata:
  name: example-workflow
spec:
  n8nRef:
    name: n8n-instance
    namespace: n8n
  workflow:
    name: "Example Workflow"
    active: true
    tags: ["automation", "metrics"]
    nodes: [...]
    connections: {...}
    settings: {...}
  credentials:
  - name: "api-credentials"
    type: "httpBasicAuth"
    secretRef:
      name: "api-secret"
      namespace: "default"
status:
  workflowId: "workflow-123"
  active: true
  lastSync: "2024-01-05T10:30:00Z"
  conditions: [...]

Workflow Controller

The controller implements the standard Kubernetes controller pattern:

Core Responsibilities:

  • Watch N8nWorkflow resources for changes
  • Validate workflow definitions and n8n instance references
  • Orchestrate credential injection and workflow synchronization
  • Maintain resource status and health monitoring
  • Handle resource deletion and cleanup

Controller Logic:

1. Reconcile Loop:
   - Get N8nWorkflow resource
   - Validate n8n instance reference
   - Process credential requirements
   - Sync workflow with n8n API
   - Update resource status
   - Requeue if necessary

2. Error Handling:
   - Retry with exponential backoff
   - Update status with error conditions
   - Log detailed error information

3. Cleanup:
   - Handle resource deletion
   - Remove workflows from n8n
   - Clean up associated credentials

Credential Manager

The Credential Manager handles secure credential injection:

Interface:

type CredentialManager interface {
    ProcessCredentials(ctx context.Context, workflow *N8nWorkflow) error
    CreateN8nCredential(ctx context.Context, n8nClient N8nClient, cred CredentialSpec) (string, error)
    UpdateN8nCredential(ctx context.Context, n8nClient N8nClient, credID string, cred CredentialSpec) error
    DeleteN8nCredential(ctx context.Context, n8nClient N8nClient, credID string) error
}

Credential Processing Flow:

  1. Parse credential specifications from N8nWorkflow
  2. Resolve Kubernetes secret references
  3. Extract credential data from secrets
  4. Create or update credentials in n8n via API
  5. Store credential IDs for workflow reference

N8n API Client

The N8n API client provides abstraction for n8n REST API operations:

Interface:

type N8nClient interface {
    // Workflow operations
    CreateWorkflow(ctx context.Context, workflow WorkflowDefinition) (*WorkflowResponse, error)
    UpdateWorkflow(ctx context.Context, workflowID string, workflow WorkflowDefinition) (*WorkflowResponse, error)
    DeleteWorkflow(ctx context.Context, workflowID string) error
    GetWorkflow(ctx context.Context, workflowID string) (*WorkflowResponse, error)
    ActivateWorkflow(ctx context.Context, workflowID string, active bool) error
    
    // Credential operations
    CreateCredential(ctx context.Context, credential CredentialDefinition) (*CredentialResponse, error)
    UpdateCredential(ctx context.Context, credentialID string, credential CredentialDefinition) (*CredentialResponse, error)
    DeleteCredential(ctx context.Context, credentialID string) error
}

Workflow Cleanup Tool

The Cleanup Tool provides operational maintenance capabilities for n8n instances, enabling bulk workflow management and cleanup operations.

Implementation Approach:

The cleanup tool is implemented as Python scripts called from Taskfile tasks. This approach provides:

  • Reusability: Python scripts can be imported and reused across different tasks
  • Maintainability: Complex logic is easier to maintain in Python than shell
  • Testing: Python scripts can be unit tested independently
  • Familiar interface: Taskfile provides the user-facing interface

Script Location: /Users/craig/src/ctoaas/k8s-lab/components/n8n/scripts/

Core Functionality:

  1. Paginated Workflow Listing

    • Fetch all workflows across multiple pages
    • Handle n8n API pagination with limit parameter (max 250 per page)
    • Report progress: “Fetching page X, found Y workflows so far…”
    • Continue fetching until all workflows are retrieved
  2. Filtering Capabilities

    • Server-side filtering: Use n8n API name query parameter for efficient filtering
    • Name-based filtering using ?name= query parameter
    • Empty/blank workflow detection (client-side after fetch)
  3. Bulk Deletion Operations

    • Delete multiple workflows in a single operation
    • Report progress per page: “Page X: Deleted Y workflows, Z total so far”
    • Dry-run mode to preview deletions
    • Error handling with continuation (don’t stop on single failure)

Task Interface:

# List all workflows with pagination
task n8n:workflows:list
 
# Clean up workflows by name (server-side filter)
task n8n:workflows:cleanup NAME="Workflow Name"
 
# Clean up blank/empty workflows (client-side filter)
task n8n:workflows:cleanup:blank
 
# Dry-run mode (preview only)
task n8n:workflows:cleanup NAME="Workflow Name" DRY_RUN=true

Python Script Structure:

# scripts/workflow_cleanup.py
 
import requests
import sys
import subprocess
import base64
from typing import List, Dict, Optional
 
class N8nWorkflowManager:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.api_key = self._get_api_key_from_k8s()
        self.headers = {"X-N8N-API-KEY": self.api_key}
    
    def _get_api_key_from_k8s(self) -> str:
        """Retrieve API key from Kubernetes secret."""
        try:
            result = subprocess.run(
                ["kubectl", "get", "secret", "n8n-api-credentials", "-n", "n8n",
                 "-o", "jsonpath={.data.apiKey}"],
                capture_output=True,
                text=True,
                check=True
            )
            # Decode base64
            api_key = base64.b64decode(result.stdout).decode('utf-8')
            return api_key
        except subprocess.CalledProcessError as e:
            print(f"Error retrieving API key from Kubernetes: {e}", file=sys.stderr)
            sys.exit(1)
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            encoded_key = result.stdout.strip()
            
            if not encoded_key:
                raise ValueError(f"Secret {secret_name} in namespace {namespace} has no apiKey field")
            
            # Decode base64
            api_key = base64.b64decode(encoded_key).decode('utf-8')
            return api_key
        except subprocess.CalledProcessError as e:
            raise RuntimeError(f"Failed to get secret: {e.stderr}")
    
    def fetch_all_workflows(self, name: Optional[str] = None) -> List[Dict]:
        """Fetch all workflows with pagination and optional server-side filtering."""
        all_workflows = []
        offset = 0
        limit = 250  # Max allowed by n8n API
        page = 1
        
        while True:
            # Build URL with pagination and filters
            url = f"{self.base_url}/api/v1/workflows"
            params = {"limit": limit, "offset": offset}
            
            # Add server-side name filter if provided
            if name:
                params["name"] = name
            
            print(f"Fetching page {page}, found {len(all_workflows)} workflows so far...", 
                  file=sys.stderr)
            
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            data = response.json()
            
            workflows = data.get("data", [])
            if not workflows:
                break
            
            all_workflows.extend(workflows)
            
            # Check if we got fewer results than limit (last page)
            if len(workflows) < limit:
                break
            
            offset += limit
            page += 1
        
        print(f"Total workflows fetched: {len(all_workflows)}", file=sys.stderr)
        return all_workflows
    
    def filter_workflows(self, workflows: List[Dict], 
                         blank_only: bool = False) -> List[Dict]:
        """Filter workflows by client-side criteria (for blank detection)."""
        filtered = workflows
        
        if blank_only:
            # Empty name or no nodes (client-side filter)
            filtered = [w for w in filtered 
                       if not w.get("name") or len(w.get("nodes", [])) == 0]
        
        return filtered
    
    def delete_workflows(self, workflow_ids: List[str], 
                        dry_run: bool = False) -> Dict[str, int]:
        """Delete workflows with progress reporting."""
        stats = {"deleted": 0, "failed": 0, "total": len(workflow_ids)}
        
        if dry_run:
            print("DRY RUN - Would delete:")
            for wf_id in workflow_ids:
                print(f"  - {wf_id}")
            return stats
        
        for i, wf_id in enumerate(workflow_ids, 1):
            try:
                url = f"{self.base_url}/api/v1/workflows/{wf_id}"
                response = requests.delete(url, headers=self.headers)
                response.raise_for_status()
                stats["deleted"] += 1
                print(f"✓ Deleted workflow {wf_id} ({i}/{stats['total']})")
            except Exception as e:
                stats["failed"] += 1
                print(f"✗ Failed to delete workflow {wf_id}: {e}", file=sys.stderr)
                # Continue processing remaining workflows
        
        return stats
 
def main():
    import argparse
    parser = argparse.ArgumentParser(description="N8n workflow cleanup utility")
    parser.add_argument("--base-url", required=True, help="N8n base URL")
    parser.add_argument("--name", help="Workflow name to filter (server-side)")
    parser.add_argument("--blank-only", action="store_true", 
                       help="Delete only blank workflows (client-side filter)")
    parser.add_argument("--dry-run", action="store_true", 
                       help="Preview deletions without executing")
    
    args = parser.parse_args()
    
    # Create manager - automatically reads API key from kubectl
    manager = N8nWorkflowManager(args.base_url)
    
    # Fetch workflows with optional server-side name filter
    workflows = manager.fetch_all_workflows(name=args.name)
    
    # Apply client-side filters if needed
    filtered = manager.filter_workflows(workflows, blank_only=args.blank_only)
    
    print(f"\nFound {len(filtered)} workflows matching criteria")
    
    if not filtered:
        print("No workflows to delete.")
        return 0
    
    # Delete workflows
    workflow_ids = [w["id"] for w in filtered]
    stats = manager.delete_workflows(workflow_ids, dry_run=args.dry_run)
    
    print(f"\nSummary:")
    print(f"  Total: {stats['total']}")
    print(f"  Deleted: {stats['deleted']}")
    print(f"  Failed: {stats['failed']}")
    
    return 0 if stats['failed'] == 0 else 1
 
if __name__ == "__main__":
    sys.exit(main())

Taskfile Integration:

workflows:cleanup:
  desc: Clean up workflows by name (usage - task n8n:workflows:cleanup NAME="Workflow Name" DRY_RUN=true)
  cmds:
    - cmd: |
        python3 scripts/workflow_cleanup.py \
          --base-url https://n8n.lab.local.ctoaas.co \
          {{if .NAME}}--name "{{.NAME}}"{{end}} \
          {{if .DRY_RUN}}--dry-run{{end}}
      dir: ../../../k8s-lab/components/n8n
 
workflows:cleanup:blank:
  desc: Clean up blank/empty workflows (usage - task n8n:workflows:cleanup:blank DRY_RUN=true)
  cmds:
    - cmd: |
        python3 scripts/workflow_cleanup.py \
          --base-url https://n8n.lab.local.ctoaas.co \
          --blank-only \
          {{if .DRY_RUN}}--dry-run{{end}}
      dir: ../../../k8s-lab/components/n8n

Error Handling:

  • Network failures: Retry with exponential backoff (requests library handles this)
  • API errors: Log error details and continue with remaining workflows
  • Authentication errors: Clear error message with troubleshooting steps
  • Invalid parameters: Validate parameters before API calls

Safety Features:

  • Dry-run mode: Preview deletions before executing
  • Progress reporting: Show per-page progress and running totals
  • Error continuation: Don’t stop entire operation on single failure
  • Summary statistics: Report total, deleted, and failed counts

Progress Reporting Format:

Fetching page 1, found 0 workflows so far...
Fetching page 2, found 100 workflows so far...
Fetching page 3, found 200 workflows so far...
Total workflows fetched: 247

Found 15 workflows matching criteria

✓ Deleted workflow abc123 (1/15)
✓ Deleted workflow def456 (2/15)
✗ Failed to delete workflow ghi789: 404 Not Found
✓ Deleted workflow jkl012 (3/15)
...

Summary:
  Total: 15
  Deleted: 14
  Failed: 1

Emergency Data Reset Tool

The Emergency Data Reset Tool provides a nuclear option for recovering from extreme data accumulation or corruption scenarios where API-based cleanup is impractical.

Implementation Approach:

The reset tool is implemented as a Taskfile task that orchestrates Kubernetes operations to completely reset an n8n instance.

Task Location: k8s-lab/components/n8n/Taskfile.yaml

Core Functionality:

  1. User Confirmation

    • Require explicit confirmation phrase to prevent accidental execution
    • Display comprehensive warning about data loss
    • List all resources that will be deleted
  2. Namespace Deletion

    • Delete the entire n8n namespace
    • Wait for graceful termination
    • Handle stuck resources with force deletion if needed
  3. Persistent Storage Cleanup

    • Identify and delete all PVCs associated with n8n
    • Force cleanup of stuck PVCs that may remain after namespace deletion
    • Ensure no orphaned storage remains
  4. Environment Restoration

    • Recreate the n8n namespace
    • Apply necessary labels for secret distribution (ESO integration)
    • Prepare environment for fresh deployment
  5. Post-Reset Guidance

    • Provide clear next steps for redeploying n8n
    • Include commands for setting up admin credentials
    • Reference relevant documentation

Task Interface:

nuclear:cleanup:
  desc: Nuclear option - Delete entire n8n namespace and PVCs to clean up extreme data accumulation (DESTRUCTIVE!)
  cmds:
    - Display comprehensive warning
    - Require confirmation phrase: "DELETE EVERYTHING"
    - Delete n8n namespace
    - Wait for namespace termination
    - Force cleanup stuck PVCs
    - Recreate namespace with labels
    - Display next steps

Safety Features:

  • Explicit Confirmation: Requires typing exact phrase “DELETE EVERYTHING”
  • Comprehensive Warning: Lists all data that will be lost
  • Step-by-Step Progress: Shows progress through each phase
  • Timeout Handling: Gracefully handles stuck resources
  • Recovery Guidance: Provides clear next steps after completion

Use Cases:

  • N8n instance has accumulated tens of thousands of workflows
  • API-based cleanup would take hours or days
  • Database corruption requires fresh start
  • Testing deployment procedures from clean state
  • Recovering from misconfiguration that created excessive data

Workflow:

# 1. Attempt API-based cleanup first (if practical)
task n8n:workflows:cleanup:blank
 
# 2. If API cleanup is impractical, use nuclear option
task n8n:nuclear:cleanup
# Type: DELETE EVERYTHING
 
# 3. Redeploy n8n
kubectl apply -k k8s-lab/supporting-applications/n8n/
 
# 4. Wait for deployment
kubectl wait --for=condition=ready pod -l app=n8n -n n8n --timeout=300s
 
# 5. Operator will automatically provision admin user
# Verify in operator logs or n8n resource status

Integration with Operator:

After nuclear cleanup and redeployment:

  1. N8n operator detects new n8n instance
  2. Operator provisions fresh admin user automatically (Requirement 10)
  3. Operator creates API credentials secret
  4. N8n instance becomes ready for workflow management

Error Handling:

  • Namespace stuck in terminating: Force cleanup with grace period 0
  • PVCs won’t delete: Use force deletion with finalizer removal
  • Timeout during cleanup: Continue with next steps, report warnings
  • Namespace recreation fails: Report error with troubleshooting steps

N8n API Authentication

The operator requires API authentication to interact with n8n instances. This is handled through a two-phase approach:

Phase 1: N8n Instance API Key Management (N8n Controller)

When the N8n controller creates an n8n instance, it:

  1. Generates a random API key (32-character alphanumeric string)
  2. Stores the API key in a Kubernetes secret named <n8n-instance-name>-api-credentials in the same namespace
  3. Mounts the secret as a file in the n8n container at /run/secrets/n8n/apiKey
  4. Configures the n8n deployment with the N8N_API_KEY_FILE environment variable pointing to the mounted file
  5. Updates the N8n resource status to indicate the API credentials secret name

Security Rationale: Mounting secrets as files instead of environment variables follows Kubernetes security best practices:

  • Environment variables can be exposed in logs, process listings, and crash dumps
  • File-based secrets have more restrictive permissions (0400)
  • File-based secrets can be rotated without restarting the container (if n8n supports hot reload)

Secret Format:

apiVersion: v1
kind: Secret
metadata:
  name: n8n-api-credentials
  namespace: n8n
type: Opaque
data:
  apiKey: <base64-encoded-random-key>

N8n Deployment Configuration:

spec:
  template:
    spec:
      containers:
      - name: n8n
        env:
        - name: N8N_API_KEY_FILE
          value: /run/secrets/n8n/apiKey
        volumeMounts:
        - name: api-credentials
          mountPath: /run/secrets/n8n
          readOnly: true
      volumes:
      - name: api-credentials
        secret:
          secretName: n8n-api-credentials
          defaultMode: 0400

Note: If n8n does not support N8N_API_KEY_FILE, the operator will need to read the file and set N8N_API_KEY via an init container or wrapper script. The design prioritizes security by using file-based secrets.

Phase 2: Administrative User Provisioning

When the N8n controller creates an n8n instance, it must provision an administrative user that the operator can use for API operations:

Administrative User Creation Flow:

  1. Wait for n8n instance and database to be ready
  2. Generate secure credentials (email: operator@n8n.local, random password)
  3. Connect to PostgreSQL database directly from operator
  4. Create administrative user via database INSERT with bcrypt password hash
  5. Store admin credentials in the same <n8n-instance-name>-api-credentials secret
  6. Verify admin user can authenticate via n8n API
  7. Mark n8n instance as ready only after successful verification

Implementation Approach:

The operator will connect directly to the PostgreSQL database during reconciliation:

Database Connection:

  • Operator retrieves PostgreSQL credentials from secrets
  • Establishes connection to PostgreSQL using Go database/sql library
  • Executes SQL commands to create/update admin user
  • Closes connection after operation completes

Admin User Creation SQL:

INSERT INTO "user" (id, email, password, "firstName", "lastName", "globalRole", "createdAt", "updatedAt")
VALUES ($1, $2, $3, 'Operator', 'Admin', 'owner', NOW(), NOW())
ON CONFLICT (email) DO UPDATE SET password = EXCLUDED.password

Password Hashing:

  • Use Go’s golang.org/x/crypto/bcrypt package
  • Generate bcrypt hash with appropriate cost factor (10-12)
  • Store hashed password in database (never plaintext)

Secret Structure (Updated):

apiVersion: v1
kind: Secret
metadata:
  name: n8n-api-credentials
  namespace: n8n
type: Opaque
data:
  apiKey: <base64-encoded-random-key>
  adminEmail: <base64-encoded-email>
  adminPassword: <base64-encoded-password>

Reconciliation Logic:

func (r *N8nReconciler) reconcileAdminUser(ctx context.Context, n8n *n8nv1alpha1.N8n) error {
    // 1. Check if admin user already exists and is valid
    if r.verifyAdminUser(ctx, n8n) {
        return nil
    }
    
    // 2. Generate admin credentials
    adminEmail := "operator@n8n.local"
    adminPassword := generateSecurePassword()
    
    // 3. Store credentials in secret
    if err := r.createOrUpdateAdminSecret(ctx, n8n, adminEmail, adminPassword); err != nil {
        return err
    }
    
    // 4. Connect to database and create user
    if err := r.createAdminUserInDatabase(ctx, n8n, adminEmail, adminPassword); err != nil {
        return err
    }
    
    // 5. Verify admin user via n8n API
    if err := r.verifyAdminUser(ctx, n8n); err != nil {
        return err
    }
    
    // 6. Update status
    n8n.Status.AdminUserProvisioned = true
    return r.Status().Update(ctx, n8n)
}

Verification Process:

After provisioning, the operator verifies the admin user:

  1. Attempt to authenticate with admin credentials via n8n API
  2. Verify the user has owner/admin role
  3. Test basic API operations (list workflows, etc.)
  4. Update N8n resource status with provisioning state
  5. Only mark instance as “Ready” after successful verification

Error Handling:

  • If database is not ready, requeue with exponential backoff
  • If user creation fails, report error in N8n resource status
  • If verification fails, mark instance as “Degraded” with error details
  • Support automatic recovery via operator reconciliation

Cleanup: When an N8n instance is deleted:

  1. Delete the admin user from n8n database
  2. Delete the <n8n-instance-name>-api-credentials secret
  3. Clean up via finalizer logic in operator

Phase 3: N8nWorkflow Controller Authentication

When the N8nWorkflow controller needs to interact with an n8n instance, it:

  1. Reads the N8n resource to get the API credentials secret name
  2. Retrieves the API key from the secret
  3. Creates an authenticated N8n API client using the API key
  4. Uses the client for all workflow and credential operations

Authentication Flow:

1. N8nWorkflow references N8n instance (n8nRef)
2. Controller fetches N8n resource
3. Controller reads API credentials secret name from N8n status
4. Controller retrieves API key from secret
5. Controller creates authenticated N8n client
6. Controller performs workflow operations

Error Handling:

  • If API credentials secret is missing, controller reports error and requeues
  • If API key is invalid (401 response), controller reports authentication error
  • If N8n instance is not ready, controller waits for instance to become ready

Database Lifecycle Management

The operator implements database lifecycle safety controls to prevent accidental data loss or corruption while providing flexibility for different deployment scenarios.

Design Principles:

  • Safe by default: Prevent accidental database reuse and data corruption
  • Explicit opt-in: Require explicit configuration for potentially dangerous operations
  • Development-friendly defaults: Automatic cleanup for ephemeral environments
  • Production-ready options: Data retention for production deployments

Database Lifecycle Configuration

The N8n CRD is extended with database lifecycle controls:

apiVersion: n8n.slys.dev/v1alpha1
kind: N8n
metadata:
  name: n8n-dev
spec:
  database:
    postgres:
      host: postgresql
      port: 5432
      database: n8n_dev
      user: n8n
      password: secret
    # Lifecycle controls
    adoptExisting: false  # Default: refuse to use existing database
    deletionPolicy: Delete  # Default: drop database on N8n deletion

Configuration Fields:

  1. adoptExisting (boolean, default: false)

    • Controls whether the operator can connect to a database with existing n8n data
    • false (default): Operator validates database is empty before provisioning
    • true: Operator allows connecting to database with existing n8n data
  2. deletionPolicy (string, default: “Delete”)

    • Controls what happens to the database when the N8n CR is deleted
    • Delete (default): Operator drops all n8n data from the database
    • Retain: Operator leaves the database intact

Type Definitions:

type Database struct {
    Postgres        Postgres       `json:"postgres"`
    AdoptExisting   bool           `json:"adoptExisting,omitempty"`
    DeletionPolicy  DeletionPolicy `json:"deletionPolicy,omitempty"`
}
 
type DeletionPolicy string
 
const (
    DeletionPolicyDelete DeletionPolicy = "Delete"
    DeletionPolicyRetain DeletionPolicy = "Retain"
)

CRD Validation:

// +kubebuilder:validation:Enum=Delete;Retain
// +kubebuilder:default="Delete"
DeletionPolicy DeletionPolicy `json:"deletionPolicy,omitempty"`
 
// +kubebuilder:default=false
AdoptExisting bool `json:"adoptExisting,omitempty"`

Database Validation Logic

Validation Flow:

func (r *N8nReconciler) validateDatabaseLifecycle(ctx context.Context, n8n *n8nv1alpha1.N8n) error {
    // Skip validation if adoptExisting is true
    if n8n.Spec.Database.AdoptExisting {
        log.Info("Adopting existing database (adoptExisting=true)")
        return nil
    }
    
    // Connect to database
    db, err := NewDatabaseConnection(ctx, n8n)
    if err != nil {
        return fmt.Errorf("failed to connect to database: %w", err)
    }
    defer db.Close()
    
    // Check if n8n tables exist
    hasN8nData, err := r.checkN8nTablesExist(ctx, db)
    if err != nil {
        return fmt.Errorf("failed to check for existing n8n data: %w", err)
    }
    
    if hasN8nData {
        return fmt.Errorf("database contains existing n8n data; set adoptExisting=true to reuse or use a different database")
    }
    
    return nil
}
 
func (r *N8nReconciler) checkN8nTablesExist(ctx context.Context, db *DatabaseConnection) (bool, error) {
    // Check for key n8n tables (user, workflow, credentials, etc.)
    query := `
        SELECT EXISTS (
            SELECT FROM information_schema.tables 
            WHERE table_schema = 'public' 
            AND table_name IN ('user', 'workflow_entity', 'credentials_entity')
        )
    `
    
    var exists bool
    err := db.QueryRow(ctx, query).Scan(&exists)
    if err != nil {
        return false, err
    }
    
    return exists, nil
}

Validation Timing:

The validation occurs during the reconciliation loop:

  1. Before creating the n8n deployment
  2. After database connection is established
  3. Before provisioning the admin user

Error Reporting:

When validation fails, the operator:

  1. Updates the N8n resource status with a clear error condition
  2. Provides actionable remediation steps in the error message
  3. Does not retry until the resource is updated
if err := r.validateDatabaseLifecycle(ctx, n8n); err != nil {
    return r.updateStatus(ctx, n8n, typeDegradedN8n, metav1.ConditionTrue, 
        "DatabaseValidationFailed",
        fmt.Sprintf("Database lifecycle validation failed: %v. "+
            "To reuse existing database, set spec.database.adoptExisting=true. "+
            "To use a fresh database, specify a different database name.", err))
}

Database Cleanup Logic

Cleanup Flow:

Database cleanup is triggered by the N8n resource deletion via a finalizer:

const n8nFinalizerName = "n8n.slys.dev/database-cleanup"
 
func (r *N8nReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    n8n := &n8nv1alpha1.N8n{}
    if err := r.Get(ctx, req.NamespacedName, n8n); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }
    
    // Handle deletion
    if !n8n.DeletionTimestamp.IsZero() {
        return r.handleDeletion(ctx, n8n)
    }
    
    // Add finalizer if not present
    if !controllerutil.ContainsFinalizer(n8n, n8nFinalizerName) {
        controllerutil.AddFinalizer(n8n, n8nFinalizerName)
        return ctrl.Result{}, r.Update(ctx, n8n)
    }
    
    // Normal reconciliation...
}
 
func (r *N8nReconciler) handleDeletion(ctx context.Context, n8n *n8nv1alpha1.N8n) (ctrl.Result, error) {
    if !controllerutil.ContainsFinalizer(n8n, n8nFinalizerName) {
        return ctrl.Result{}, nil
    }
    
    // Perform cleanup based on deletionPolicy
    if n8n.Spec.Database.DeletionPolicy == n8nv1alpha1.DeletionPolicyDelete {
        if err := r.cleanupDatabase(ctx, n8n); err != nil {
            log.Error(err, "Failed to cleanup database")
            // Report error but allow deletion to proceed
            r.updateStatus(ctx, n8n, typeDegradedN8n, metav1.ConditionTrue,
                "DatabaseCleanupFailed",
                fmt.Sprintf("Database cleanup failed: %v. Manual cleanup may be required.", err))
        }
    } else {
        log.Info("Retaining database (deletionPolicy=Retain)")
    }
    
    // Remove finalizer
    controllerutil.RemoveFinalizer(n8n, n8nFinalizerName)
    return ctrl.Result{}, r.Update(ctx, n8n)
}
 
func (r *N8nReconciler) cleanupDatabase(ctx context.Context, n8n *n8nv1alpha1.N8n) error {
    log := log.FromContext(ctx)
    
    // Connect to database
    db, err := NewDatabaseConnection(ctx, n8n)
    if err != nil {
        return fmt.Errorf("failed to connect to database for cleanup: %w", err)
    }
    defer db.Close()
    
    // Drop all n8n tables
    tables := []string{
        "workflow_entity",
        "credentials_entity",
        "execution_entity",
        "user",
        "settings",
        "tag_entity",
        "webhook_entity",
        // Add other n8n tables as needed
    }
    
    for _, table := range tables {
        query := fmt.Sprintf("DROP TABLE IF EXISTS %s CASCADE", table)
        if _, err := db.ExecuteQuery(ctx, query); err != nil {
            log.Error(err, "Failed to drop table", "table", table)
            // Continue with other tables
        } else {
            log.Info("Dropped table", "table", table)
        }
    }
    
    log.Info("Database cleanup completed")
    return nil
}

Cleanup Safety:

  • Best-effort cleanup: Errors during cleanup are logged but don’t block deletion
  • Cascade deletion: Use CASCADE to handle foreign key constraints
  • Finalizer removal: Always remove finalizer to prevent stuck resources
  • Status reporting: Update status with cleanup results before finalizer removal

Status Reporting

The N8n status is extended to report database lifecycle state:

type N8nStatus struct {
    // ... existing fields ...
    
    DatabaseState DatabaseState `json:"databaseState,omitempty"`
}
 
type DatabaseState struct {
    Validated       bool         `json:"validated"`
    AdoptedExisting bool         `json:"adoptedExisting,omitempty"`
    DeletionPolicy  string       `json:"deletionPolicy"`
    LastValidation  *metav1.Time `json:"lastValidation,omitempty"`
}

Status Updates:

status:
  databaseState:
    validated: true
    adoptedExisting: false
    deletionPolicy: Delete
    lastValidation: "2024-01-15T10:30:00Z"
  conditions:
  - type: DatabaseReady
    status: "True"
    reason: DatabaseValidated
    message: "Database validated and ready for use"

Use Case Examples

Development Environment (Default):

apiVersion: n8n.slys.dev/v1alpha1
kind: N8n
metadata:
  name: n8n-dev
spec:
  database:
    postgres:
      host: postgresql
      database: n8n_dev
      user: n8n
      password: secret
    # Defaults apply:
    # adoptExisting: false (prevent reuse)
    # deletionPolicy: Delete (automatic cleanup)

Production Environment (Data Retention):

apiVersion: n8n.slys.dev/v1alpha1
kind: N8n
metadata:
  name: n8n-prod
spec:
  database:
    postgres:
      host: postgresql.prod.svc
      database: n8n_prod
      user: n8n
      password: secret
    adoptExisting: false  # Still prevent accidental reuse
    deletionPolicy: Retain  # Keep database on deletion

Disaster Recovery (Reconnect to Existing):

apiVersion: n8n.slys.dev/v1alpha1
kind: N8n
metadata:
  name: n8n-restored
spec:
  database:
    postgres:
      host: postgresql
      database: n8n_backup
      user: n8n
      password: secret
    adoptExisting: true  # Explicitly allow reusing existing data
    deletionPolicy: Retain  # Don't delete the restored data

Error Scenarios and Handling

Scenario 1: Accidental Database Collision

User creates N8n CR pointing to database with existing data
→ Operator validates database
→ Finds existing n8n tables
→ Rejects provisioning with clear error message
→ User must either:
  - Set adoptExisting=true (if intentional)
  - Use a different database name (if accidental)

Scenario 2: Cleanup Failure

User deletes N8n CR with deletionPolicy=Delete
→ Operator attempts database cleanup
→ Cleanup fails (e.g., connection error)
→ Operator logs error and updates status
→ Operator removes finalizer (allows CR deletion)
→ User can manually clean up database if needed

Scenario 3: Multiple Instances, Same Database

User creates two N8n CRs pointing to same database
→ First instance: validates and provisions successfully
→ Second instance: validation detects existing data
→ Second instance: rejects provisioning
→ Prevents data corruption from shared database

Data Models

N8nWorkflow Specification

type N8nWorkflowSpec struct {
    N8nRef      N8nReference      `json:"n8nRef"`
    Workflow    WorkflowDefinition `json:"workflow"`
    Credentials []CredentialSpec   `json:"credentials,omitempty"`
}
 
type N8nReference struct {
    Name      string `json:"name"`
    Namespace string `json:"namespace,omitempty"`
}
 
type WorkflowDefinition struct {
    Name        string                 `json:"name"`
    Active      bool                   `json:"active"`
    Tags        []string               `json:"tags,omitempty"`
    Nodes       []WorkflowNode         `json:"nodes"`
    Connections map[string]interface{} `json:"connections"`
    Settings    map[string]interface{} `json:"settings,omitempty"`
}
 
type CredentialSpec struct {
    Name      string           `json:"name"`
    Type      string           `json:"type"`
    SecretRef SecretReference  `json:"secretRef"`
}

Status Model

type N8nWorkflowStatus struct {
    WorkflowID string                 `json:"workflowId,omitempty"`
    Active     bool                   `json:"active"`
    LastSync   *metav1.Time          `json:"lastSync,omitempty"`
    Conditions []metav1.Condition    `json:"conditions,omitempty"`
}

Correctness Properties

A property is a characteristic or behavior that should hold true across all valid executions of a system-essentially, a formal statement about what the system should do. Properties serve as the bridge between human-readable specifications and machine-verifiable correctness guarantees.

Now I need to analyze the acceptance criteria to determine which ones can be tested as properties. Let me use the prework tool:

Based on the prework analysis, I’ve identified the following testable properties after eliminating redundancy:

Property 1: CRD Schema Validation

For any N8nWorkflow resource submitted to the Kubernetes API, the CRD schema should validate all required fields (name, nodes, connections, n8nRef) and reject resources with missing or malformed data Validates: Requirements 1.1, 1.5, 2.1, 3.1, 4.1, 6.1, 6.5

Property 2: Workflow Creation Synchronization

For any valid N8nWorkflow resource created in Kubernetes, the controller should create a corresponding workflow in the referenced n8n instance with matching configuration Validates: Requirements 1.2

Property 3: Workflow Update Synchronization

For any existing N8nWorkflow resource that is updated, the controller should update the corresponding workflow in n8n to match the new configuration Validates: Requirements 1.3, 1.6

Property 4: Workflow Deletion Cleanup

For any N8nWorkflow resource that is deleted, the controller should remove the corresponding workflow from the n8n instance Validates: Requirements 1.4

Property 5: N8n Instance Validation

For any N8nWorkflow resource referencing an n8n instance, the controller should validate that the instance exists and is accessible before attempting workflow operations Validates: Requirements 2.2, 6.3

Property 6: Workflow Independence

For any set of N8nWorkflow resources targeting the same n8n instance, the controller should manage each workflow independently without interference Validates: Requirements 2.4, 2.5

Property 7: Credential Injection Ordering

For any N8nWorkflow resource requiring credentials, the credential manager should create all required credentials in n8n before creating the workflow Validates: Requirements 3.2

Property 8: Credential Type Support

For any supported credential type (HTTP Basic Auth, API keys, OAuth tokens), the credential manager should correctly process and inject the credential data into n8n Validates: Requirements 3.3, 3.5

Property 9: Credential Synchronization

For any Kubernetes secret referenced by a workflow credential, when the secret data changes, the controller should update the corresponding n8n credential Validates: Requirements 3.4

Property 10: Workflow Activation Control

For any N8nWorkflow resource with active field set to true or false, the controller should activate or deactivate the workflow in n8n accordingly Validates: Requirements 4.2, 4.3

Property 11: Status Reporting Completeness

For any workflow operation (create, update, activate), the controller should update the resource status with workflow ID, active state, and lastSync timestamp Validates: Requirements 4.4, 5.1, 5.2, 5.4

Property 12: Health Monitoring

For any managed workflow, the controller should be able to verify the workflow’s existence and state in n8n through health checks Validates: Requirements 5.5

Property 13: Comprehensive Error Reporting

For any operation that fails (validation, n8n instance not found, activation failure, sync failure), the controller should report descriptive error conditions in the resource status Validates: Requirements 2.3, 4.5, 5.3, 6.2, 6.4

Property 14: Paginated Workflow Retrieval

For any n8n instance with workflows across multiple pages, the cleanup tool should retrieve all workflows by following pagination cursors until no more pages remain Validates: Requirements 8.1, 8.2

Property 15: Filtered Workflow Deletion

For any name filter pattern provided to the cleanup tool, only workflows matching that pattern should be deleted, and all matching workflows should be deleted Validates: Requirements 8.3, 8.7

Property 16: Cleanup Error Resilience

For any bulk deletion operation where some workflows fail to delete, the cleanup tool should continue processing remaining workflows and report all errors at the end Validates: Requirements 8.6

Property 17: Database Emptiness Validation

For any N8n instance created with adoptExisting=false (default), the operator should reject provisioning if the target database contains existing n8n tables Validates: Requirements 11.1, 11.2

Property 18: Database Adoption Control

For any N8n instance created with adoptExisting=true, the operator should allow provisioning even when the target database contains existing n8n data Validates: Requirements 11.3

Property 19: Database Isolation Enforcement

For any two N8n instances referencing the same database, the operator should prevent the second instance from provisioning unless adoptExisting=true Validates: Requirements 11.4, 11.6

Property 20: Database Cleanup Execution

For any N8n instance deleted with deletionPolicy=Delete (default), the operator should remove all n8n tables from the database Validates: Requirements 11.9, 11.11

Property 21: Database Retention Preservation

For any N8n instance deleted with deletionPolicy=Retain, the operator should leave the database intact without removing any data Validates: Requirements 11.8, 11.10

Property 22: Database Lifecycle Status Reporting

For any N8n instance, the operator should report the database validation state, adoption status, and deletion policy in the resource status Validates: Requirements 11.5, 11.13

Error Handling

The system implements comprehensive error handling at multiple levels:

Controller-Level Error Handling

  • Retry Logic: Exponential backoff for transient failures
  • Error Classification: Distinguish between retryable and permanent errors
  • Status Updates: Always update resource status with error information
  • Logging: Structured logging for debugging and monitoring

API Client Error Handling

  • HTTP Error Mapping: Map n8n API errors to meaningful controller errors
  • Timeout Handling: Configurable timeouts for API operations
  • Connection Failures: Retry logic for network-related failures
  • Authentication Errors: Clear error messages for credential issues

Validation Error Handling

  • Schema Validation: OpenAPI schema validation at admission time
  • Business Logic Validation: Additional validation in controller
  • Credential Validation: Verify secret references and credential types
  • N8n Instance Validation: Verify n8n instance accessibility

Error Recovery Strategies

1. Transient Errors (network, temporary n8n unavailability):
   - Exponential backoff retry
   - Maximum retry attempts
   - Requeue for later processing

2. Configuration Errors (invalid workflow, missing secrets):
   - Update status with error details
   - Do not retry until resource is updated
   - Clear error messages for user action

3. Permanent Errors (n8n instance not found, invalid credentials):
   - Update status with permanent error condition
   - Stop reconciliation until resource is fixed
   - Provide actionable error messages

Testing Strategy

The testing strategy follows a comprehensive approach with both unit tests and property-based tests to ensure correctness and reliability.

Unit Testing Approach

Controller Unit Tests:

  • Mock n8n API client for isolated testing
  • Test individual reconciliation scenarios
  • Verify error handling and status updates
  • Test credential processing logic

API Client Unit Tests:

  • Mock HTTP responses from n8n API
  • Test request/response serialization
  • Verify error handling and retry logic
  • Test authentication and authorization

Credential Manager Unit Tests:

  • Mock Kubernetes secret access
  • Test credential type processing
  • Verify secret data extraction and transformation
  • Test credential lifecycle management

Property-Based Testing Configuration

Testing Framework: Use Go’s testing framework with a property-based testing library (e.g., gopter or rapid)

Test Configuration:

  • Minimum 100 iterations per property test
  • Each property test references its design document property
  • Tag format: Feature: n8n-workflow-management, Property {number}: {property_text}

Property Test Implementation: Each correctness property will be implemented as a property-based test that:

  1. Generates random valid inputs within the property’s domain
  2. Executes the system operation
  3. Verifies the expected property holds
  4. Reports any counterexamples for debugging

Integration Testing:

  • Test against real n8n instances in test environments
  • Verify end-to-end workflow creation and management
  • Test credential injection with real secrets
  • Validate ArgoCD integration and GitOps workflows

Test Data Management

Test Fixtures:

  • Sample N8nWorkflow resources for various scenarios
  • Mock n8n API responses for different workflow types
  • Test secrets with various credential types
  • Error scenarios and edge cases

Test Environment:

  • Isolated test namespaces for each test run
  • Clean up resources after each test
  • Use test-specific n8n instances when possible
  • Mock external dependencies for unit tests

The testing approach ensures that both specific examples work correctly (unit tests) and that universal properties hold across all possible inputs (property-based tests), providing comprehensive coverage and confidence in the system’s correctness.