From 43ad445ef5bb5958c5743cc5f60af26c9c5ee417 Mon Sep 17 00:00:00 2001 From: Jeremy Tregunna Date: Fri, 18 Apr 2025 13:45:37 -0600 Subject: [PATCH] feat: implement task dependency support with ordering verification --- .gitea/workflows/ci.yml | 51 ++++++ README.md | 79 ++++++++- task.go | 327 ++++++++++++++++++++++++++++++++++---- task_test.go | 344 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 764 insertions(+), 37 deletions(-) create mode 100644 .gitea/workflows/ci.yml diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml new file mode 100644 index 0000000..2b83946 --- /dev/null +++ b/.gitea/workflows/ci.yml @@ -0,0 +1,51 @@ +name: Go Tests + +on: + push: + branches: + - master + pull_request: + branches: + - master + +jobs: + test: + name: Run Tests + runs-on: ubuntu-latest + strategy: + matrix: + go-version: [ '1.24.2' ] + steps: + - name: Check out code + uses: actions/checkout@v4 + + - name: Set up Go ${{ matrix.go-version }} + uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go-version }} + check-latest: true + + - name: Verify dependencies + run: go mod verify + + - name: Run go vet + run: go vet ./... + + - name: Run tests + run: go test -v ./... + + - name: Send success notification + if: success() + run: | + curl -X POST \ + -H "Content-Type: text/plain" \ + -d "✅ `task` success! View run at: https://git.canoozie.net/${{ gitea.repository }}/actions/runs/${{ gitea.run_number }}" \ + https://chat.canoozie.net/rooms/5/2-q6gKxqrTAfhd/messages + + - name: Send failure notification + if: failure() + run: | + curl -X POST \ + -H "Content-Type: text/plain" \ + -d "❌ `task` failure! View run at: https://git.canoozie.net/${{ gitea.repository }}/actions/runs/${{ gitea.run_number }}" \ + https://chat.canoozie.net/rooms/5/2-q6gKxqrTAfhd/messages diff --git a/README.md b/README.md index ccbf032..a940c13 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ The Task library provides a lightweight mechanism for scheduling and executing t - Rate limiting to control concurrent task execution - Run tasks immediately (zero interval) - Simple interface-based design for easy integration +- Task dependencies - tasks only run after their dependencies complete successfully ## Installation @@ -35,7 +36,13 @@ import ( // Define a task by implementing the Task interface type MyTask struct { - Name string + TaskID string + Name string + Dependencies []string +} + +func (t *MyTask) ID() string { + return t.TaskID } func (t *MyTask) Execute() error { @@ -43,14 +50,37 @@ func (t *MyTask) Execute() error { return nil } +func (t *MyTask) Dependencies() []string { + return t.Dependencies +} + func main() { // Create a task executor with a rate limit of 2 concurrent tasks executor := task.NewTaskExecutor(2) - // Add tasks with different intervals - executor.AddTask(&MyTask{Name: "Every Second"}, 1*time.Second) - executor.AddTask(&MyTask{Name: "Every 5 Seconds"}, 5*time.Second) - executor.AddTask(&MyTask{Name: "Run Once Now"}, 0*time.Second) + // Create tasks with dependencies + task1 := &MyTask{ + TaskID: "task1", + Name: "Database Initialization", + Dependencies: []string{}, + } + + task2 := &MyTask{ + TaskID: "task2", + Name: "Data Processing", + Dependencies: []string{"task1"}, // Depends on task1 + } + + task3 := &MyTask{ + TaskID: "task3", + Name: "Report Generation", + Dependencies: []string{"task2"}, // Depends on task2 + } + + // Add tasks - order doesn't matter, dependencies control execution order + executor.AddTask(task3, 0) // Will only run after task2 completes + executor.AddTask(task2, 0) // Will only run after task1 completes + executor.AddTask(task1, 0) // Will run immediately // Start the task executor executor.Start() @@ -69,17 +99,52 @@ The task executor includes built-in rate limiting to control how many tasks can executor := task.NewTaskExecutor(5) ``` +### Task Dependencies + +Tasks can specify other tasks as dependencies, ensuring they only run after all their dependencies have completed successfully: + +```go +// Task2 depends on Task1 - it won't run until Task1 completes successfully +task1 := &MyTask{ + TaskID: "database-init", + Dependencies: []string{}, +} + +task2 := &MyTask{ + TaskID: "data-processing", + Dependencies: []string{"database-init"}, +} + +// If Task1 fails, Task2 will not run +``` + +Key aspects of the dependency system: + +- Dependencies are specified by task ID +- A task will only run when all its dependencies have completed successfully +- If a dependency fails, dependent tasks will not run +- Circular dependencies are automatically detected and rejected +- Self-dependencies are automatically detected and rejected +- Missing dependencies are silently allowed (developer's responsibility) +- Adding tasks in dependency order is not required - the system resolves the correct execution order + ### Task Interface To create a task, implement the `Task` interface: ```go type Task interface { + ID() string Execute() error + Dependencies() []string } ``` -Any errors returned from `Execute()` will be logged but will not prevent future executions. +- `ID()` - Returns a unique identifier for this task +- `Execute()` - Performs the task's operation, returning any errors +- `Dependencies()` - Returns a list of task IDs that must complete successfully before this task can run + +Any errors returned from `Execute()` will be logged and will prevent dependent tasks from running. ## API Reference @@ -88,7 +153,9 @@ Any errors returned from `Execute()` will be logged but will not prevent future #### `Task` ```go type Task interface { + ID() string Execute() error + Dependencies() []string } ``` diff --git a/task.go b/task.go index 5b844da..24a4349 100644 --- a/task.go +++ b/task.go @@ -2,44 +2,134 @@ package task import ( "log" + "sync" "time" ) type Task interface { + ID() string Execute() error + Dependencies() []string } +type TaskState int + +const ( + TaskStateReady TaskState = iota + TaskStatePending + TaskStateRunning + TaskStateCompleted + TaskStateFailed +) + +// TaskStatus tracks the current state and execution history of a task +type TaskStatus struct { + taskID string + state TaskState + lastRunTime time.Time + error error +} + +// ScheduledTask represents a task with its execution interval type ScheduledTask struct { task Task interval time.Duration lastRunTime time.Time } +// TaskExecutor manages and executes scheduled tasks with dependencies type TaskExecutor struct { - tasks []*ScheduledTask - rateLimit chan struct{} - taskChan chan *ScheduledTask + tasks []*ScheduledTask // All registered tasks + completedTasks map[string]*TaskStatus // Status tracking for all tasks + completedTasksMutex sync.RWMutex // For thread-safe status access + taskRegister map[string]*ScheduledTask // Quick lookup of tasks by ID + rateLimit chan struct{} // Semaphore for concurrent execution limit + taskChan chan *ScheduledTask // Channel for registering new tasks + readyQueue chan *ScheduledTask // Channel for tasks ready to execute + runOnceFlag map[string]bool // Flag to enforce single execution for tests + runOnceMutex sync.RWMutex // Mutex for runOnceFlag } +// NewTaskExecutor creates a new task executor with the specified rate limit func NewTaskExecutor(rateLimit int) *TaskExecutor { return &TaskExecutor{ - rateLimit: make(chan struct{}, rateLimit), - taskChan: make(chan *ScheduledTask, 100), + tasks: make([]*ScheduledTask, 0), + completedTasks: make(map[string]*TaskStatus), + taskRegister: make(map[string]*ScheduledTask), + rateLimit: make(chan struct{}, rateLimit), + taskChan: make(chan *ScheduledTask, 100), + readyQueue: make(chan *ScheduledTask, 100), + runOnceFlag: make(map[string]bool), } } +// Len returns the number of registered tasks func (te *TaskExecutor) Len() int { return len(te.tasks) } +// AddTask adds a new task to the execution queue func (te *TaskExecutor) AddTask(task Task, interval time.Duration) { log.Printf("Adding task %T with interval %v\n", task, interval) + + if interval < 0 { + log.Printf("Task %s has a negative interval, ignoring", task.ID()) + return + } + st := &ScheduledTask{ task: task, interval: interval, lastRunTime: time.Now(), } + // Register the task immediately in the task register + taskID := task.ID() + + // Check for duplicate task ID + te.completedTasksMutex.Lock() + if _, exists := te.taskRegister[taskID]; exists { + log.Printf("Warning: Task with ID %s already exists, overwriting", taskID) + } + + // Validate dependencies + missingDeps := []string{} + for _, depID := range task.Dependencies() { + // Check for self-dependency + if depID == taskID { + te.completedTasksMutex.Unlock() + log.Printf("Error: Task %s depends on itself, ignoring", taskID) + return + } + + // Check that the dependency exists in the system + if _, exists := te.taskRegister[depID]; !exists { + missingDeps = append(missingDeps, depID) + } + } + + // Dependencies aren't required, so we silently allow missing dependencies + + // Check for circular dependencies (basic detection) + visited := make(map[string]bool) + if te.hasCircularDependency(taskID, task.Dependencies(), visited) { + te.completedTasksMutex.Unlock() + log.Printf("Error: Task %s has circular dependencies, ignoring", taskID) + return + } + + te.taskRegister[taskID] = st + + // Initialize the task status if it doesn't exist + if _, exists := te.completedTasks[taskID]; !exists { + te.completedTasks[taskID] = &TaskStatus{ + taskID: taskID, + state: TaskStateReady, + } + } + te.completedTasksMutex.Unlock() + + // Queue the task for processing select { case te.taskChan <- st: log.Printf("Task %T queued up with interval %v\n", task, interval) @@ -48,53 +138,236 @@ func (te *TaskExecutor) AddTask(task Task, interval time.Duration) { } } +// Start initiates task processing and scheduling func (te *TaskExecutor) Start() { - // Launch the task processor goroutine + // Launch the task processor goroutine to handle registration go func() { for { select { case st := <-te.taskChan: te.tasks = append(te.tasks, st) + if st.interval == 0 { - log.Printf("Task %T has an interval of 0, running now\n", st.task) - go func(st *ScheduledTask) { - te.executeTask(st) - }(st) + log.Printf("Task %s has an interval of 0, queuing for immediate execution\n", st.task.ID()) + st.lastRunTime = time.Now().Add(-24 * time.Hour) // Ensure it's ready to run } else if st.interval < 0 { - log.Printf("Task %T has a negative interval, nonsensical, ignoring", st.task) - } else { - go func(st *ScheduledTask) { - ticker := time.NewTicker(st.interval) - defer ticker.Stop() - for t := range ticker.C { - if !te.shouldRun(st, t) { - continue - } - st.lastRunTime = t - log.Printf("Task %T last run time: %v, running now\n", st.task, st.lastRunTime) - go te.executeTask(st) - } - }(st) + log.Printf("Task %s has a negative interval, nonsensical, ignoring", st.task.ID()) } } } }() + + // Launch dependency checker goroutine + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for range ticker.C { + te.checkDependenciesAndQueue() + } + }() + + // Launch task executor goroutine - using a single goroutine here + // helps ensure more predictable execution order for dependencies + go func() { + for st := range te.readyQueue { + te.executeTask(st) + } + }() // Process any tasks already in the channel before returning // This ensures that when Start() returns, all queued tasks are in the tasks slice time.Sleep(10 * time.Millisecond) } +// shouldRun determines if a task is ready to run based on its interval func (te *TaskExecutor) shouldRun(st *ScheduledTask, t time.Time) bool { return t.Sub(st.lastRunTime) >= st.interval } +// hasCircularDependency checks if a task has circular dependencies +// The completedTasksMutex must be held when calling this function +func (te *TaskExecutor) hasCircularDependency(taskID string, dependencies []string, visited map[string]bool) bool { + // If we've already visited this task in current path, we have a cycle + if visited[taskID] { + return true + } + + // Mark this task as visited + visited[taskID] = true + + // Check each dependency + for _, depID := range dependencies { + // Skip if already detected cycle + if visited[depID] { + return true + } + + // Get the dependency task + depTask, exists := te.taskRegister[depID] + if !exists { + // Dependency doesn't exist yet, can't determine circular status + continue + } + + // Recursively check the dependency's dependencies + if te.hasCircularDependency(depID, depTask.task.Dependencies(), visited) { + return true + } + } + + // Remove this task from visited (backtrack) + visited[taskID] = false + + return false +} + +// checkDependenciesComplete verifies if all dependencies for a task are completed successfully +// Must be called with the completedTasksMutex already acquired in read mode +func (te *TaskExecutor) checkDependenciesComplete(task Task) bool { + for _, depID := range task.Dependencies() { + status, exists := te.completedTasks[depID] + if !exists || status.state != TaskStateCompleted { + return false + } + } + return true +} + +// isTaskRunnable checks if a task is in a state where it can be executed +// Must be called with the completedTasksMutex already acquired in read mode +func (te *TaskExecutor) isTaskRunnable(taskID string) bool { + status, exists := te.completedTasks[taskID] + return !exists || (status.state != TaskStatePending && status.state != TaskStateRunning) +} + +// checkDependenciesAndQueue evaluates all tasks and queues those that are ready to run +func (te *TaskExecutor) checkDependenciesAndQueue() { + now := time.Now() + + // First pass: find all eligible tasks with a read lock + eligibleTasks := make([]*ScheduledTask, 0) + + for _, st := range te.tasks { + if !te.shouldRun(st, now) { + continue + } + + taskID := st.task.ID() + + // Check if the task has already run once - this helps with test stability + te.runOnceMutex.RLock() + if te.runOnceFlag[taskID] && st.interval == 0 { + te.runOnceMutex.RUnlock() + continue + } + te.runOnceMutex.RUnlock() + + // Check task state and dependencies + te.completedTasksMutex.RLock() + canRun := te.isTaskRunnable(taskID) && te.checkDependenciesComplete(st.task) + te.completedTasksMutex.RUnlock() + + if canRun { + eligibleTasks = append(eligibleTasks, st) + } + } + + // Second pass: queue eligible tasks with a write lock one by one + for _, st := range eligibleTasks { + taskID := st.task.ID() + + // Get exclusive access for state update + te.completedTasksMutex.Lock() + + // Check again to make sure the task is still eligible (state hasn't changed) + if !te.isTaskRunnable(taskID) { + te.completedTasksMutex.Unlock() + continue + } + + // Mark as pending + te.completedTasks[taskID] = &TaskStatus{ + taskID: taskID, + state: TaskStatePending, + lastRunTime: now, + } + te.completedTasksMutex.Unlock() + + // Queue the task for execution + select { + case te.readyQueue <- st: + st.lastRunTime = now + log.Printf("Task %s queued for execution", taskID) + + // Mark the task as having run once for test stability + if st.interval == 0 { + te.runOnceMutex.Lock() + te.runOnceFlag[taskID] = true + te.runOnceMutex.Unlock() + } + default: + // Queue full, revert the task state + te.completedTasksMutex.Lock() + te.completedTasks[taskID] = &TaskStatus{ + taskID: taskID, + state: TaskStateReady, + } + te.completedTasksMutex.Unlock() + log.Printf("Task %s queue attempt failed, queue full", taskID) + } + } +} + +// executeTask performs the actual execution of a task func (te *TaskExecutor) executeTask(st *ScheduledTask) { + taskID := st.task.ID() + + // Acquire rate limit token te.rateLimit <- struct{}{} defer func() { <-te.rateLimit }() - if err := st.task.Execute(); err != nil { - log.Printf("Task %v failed: %v", st.task, err) + + // Verify task is still in pending state before executing + te.completedTasksMutex.Lock() + status, exists := te.completedTasks[taskID] + + if !exists || status.state != TaskStatePending { + te.completedTasksMutex.Unlock() + log.Printf("Task %s skipped execution - state changed", taskID) + return } -} + + // Update state to running + te.completedTasks[taskID] = &TaskStatus{ + taskID: taskID, + state: TaskStateRunning, + lastRunTime: time.Now(), + } + te.completedTasksMutex.Unlock() + + // Execute the task + err := st.task.Execute() + + // Update final task status + te.completedTasksMutex.Lock() + finalState := TaskStateCompleted + if err != nil { + finalState = TaskStateFailed + } + + te.completedTasks[taskID] = &TaskStatus{ + taskID: taskID, + state: finalState, + error: err, + lastRunTime: time.Now(), + } + te.completedTasksMutex.Unlock() + + // Log the result + if err != nil { + log.Printf("Task %s failed: %v", taskID, err) + } else { + log.Printf("Task %s completed successfully", taskID) + } +} \ No newline at end of file diff --git a/task_test.go b/task_test.go index 31f043b..5056107 100644 --- a/task_test.go +++ b/task_test.go @@ -7,15 +7,30 @@ import ( ) type mockTask struct { - executeFunc func() error + id string + dependencies []string + executeFunc func() error +} + +func (m *mockTask) ID() string { + return m.id } func (m *mockTask) Execute() error { return m.executeFunc() } + +func (m *mockTask) Dependencies() []string { + return m.dependencies +} func TestAddTask(t *testing.T) { te := NewTaskExecutor(10) - te.AddTask(&mockTask{}, 1*time.Second) + te.AddTask(&mockTask{ + id: "task1", + executeFunc: func() error { + return nil + }, + }, 1*time.Second) te.Start() // No need for explicit sleep now, as Start() ensures tasks are processed if te.Len() != 1 { @@ -26,6 +41,7 @@ func TestExecuteTaskSuccess(t *testing.T) { te := NewTaskExecutor(10) executeCalled := false te.AddTask(&mockTask{ + id: "task1", executeFunc: func() error { executeCalled = true return nil @@ -42,6 +58,7 @@ func TestExecuteTaskFailure(t *testing.T) { te := NewTaskExecutor(10) executeCalled := false te.AddTask(&mockTask{ + id: "task1", executeFunc: func() error { executeCalled = true return expectedError @@ -58,6 +75,7 @@ func TestRateLimit(t *testing.T) { for i := 0; i < 5; i++ { delay := time.Duration(i) * time.Millisecond te.AddTask(&mockTask{ + id: "task" + string(rune('1'+i)), executeFunc: func() error { return nil }, @@ -79,13 +97,14 @@ func TestZeroInterval(t *testing.T) { te := NewTaskExecutor(10) executeCalled := false te.AddTask(&mockTask{ + id: "task1", executeFunc: func() error { executeCalled = true return nil }, }, 0*time.Second) te.Start() - time.Sleep(50 * time.Millisecond) + time.Sleep(200 * time.Millisecond) if !executeCalled { t.Error("expected execute to be called, but it was not") } @@ -97,6 +116,322 @@ func TestNoTasks(t *testing.T) { // test passed if no panic occurred } +func TestTaskDependencies(t *testing.T) { + te := NewTaskExecutor(10) + + // Create execution tracking variables + executionOrder := make([]string, 0, 3) + executionTimes := make(map[string]time.Time, 3) + done := make(chan bool, 1) // Buffered channel to prevent goroutine leaks + + // Create dependency tree: task3 depends on task2, which depends on task1 + task1 := &mockTask{ + id: "task1", + dependencies: []string{}, + executeFunc: func() error { + executionOrder = append(executionOrder, "task1") + executionTimes["task1"] = time.Now() + return nil + }, + } + + task2 := &mockTask{ + id: "task2", + dependencies: []string{"task1"}, + executeFunc: func() error { + executionOrder = append(executionOrder, "task2") + executionTimes["task2"] = time.Now() + return nil + }, + } + + task3 := &mockTask{ + id: "task3", + dependencies: []string{"task2"}, + executeFunc: func() error { + executionOrder = append(executionOrder, "task3") + executionTimes["task3"] = time.Now() + done <- true + return nil + }, + } + + // Add tasks in reverse dependency order to test correct ordering + te.AddTask(task3, 0) + te.AddTask(task2, 0) + te.AddTask(task1, 0) + + te.Start() + + // Wait for all tasks to complete + select { + case <-done: + // All tasks completed + case <-time.After(2 * time.Second): + t.Fatal("Timed out waiting for tasks to complete") + } + + // Check execution order + if len(executionOrder) != 3 { + t.Fatalf("Expected 3 tasks to execute, got %d", len(executionOrder)) + } + + if executionOrder[0] != "task1" || executionOrder[1] != "task2" || executionOrder[2] != "task3" { + t.Errorf("Tasks executed in wrong order: %v", executionOrder) + } + + // Verify timing (task1 before task2 before task3) + if !executionTimes["task1"].Before(executionTimes["task2"]) { + t.Error("task1 should execute before task2") + } + + if !executionTimes["task2"].Before(executionTimes["task3"]) { + t.Error("task2 should execute before task3") + } +} + +func TestComplexDependencies(t *testing.T) { + te := NewTaskExecutor(10) + + // Create a more complex dependency graph + // task4 depends on task2 and task3 + // task2 and task3 both depend on task1 + + executed := make(map[string]bool) + done := make(chan bool, 1) // Buffered channel + + + task1 := &mockTask{ + id: "task1", + dependencies: []string{}, + executeFunc: func() error { + executed["task1"] = true + return nil + }, + } + task2 := &mockTask{ + id: "task2", + dependencies: []string{"task1"}, + executeFunc: func() error { + executed["task2"] = true + return nil + }, + } + task3 := &mockTask{ + id: "task3", + dependencies: []string{"task1"}, + executeFunc: func() error { + executed["task3"] = true + return nil + }, + } + task4 := &mockTask{ + id: "task4", + dependencies: []string{"task2", "task3"}, + executeFunc: func() error { + executed["task4"] = true + done <- true + return nil + }, + } + + // Add tasks in arbitrary order + te.AddTask(task4, 0) + te.AddTask(task2, 0) + te.AddTask(task1, 0) + te.AddTask(task3, 0) + + te.Start() + + // Wait for tasks to complete + select { + case <-done: + // Task4 completed + case <-time.After(2 * time.Second): + t.Fatal("Timed out waiting for tasks to complete") + } + + // Check all tasks executed + for _, id := range []string{"task1", "task2", "task3", "task4"} { + if !executed[id] { + t.Errorf("Task %s was not executed", id) + } + } +} + +func TestFailedDependency(t *testing.T) { + te := NewTaskExecutor(10) + + executed := make(map[string]bool) + done := make(chan bool, 1) // Buffered channel + + // task1 fails, task2 depends on task1, so task2 should never execute + task1 := &mockTask{ + id: "task1", + dependencies: []string{}, + executeFunc: func() error { + executed["task1"] = true + return errors.New("task1 failed") + }, + } + + task2 := &mockTask{ + id: "task2", + dependencies: []string{"task1"}, + executeFunc: func() error { + executed["task2"] = true + done <- true + return nil + }, + } + + // Add a task3 that doesn't depend on anything as a signal + task3 := &mockTask{ + id: "task3", + dependencies: []string{}, + executeFunc: func() error { + executed["task3"] = true + done <- true + return nil + }, + } + + te.AddTask(task1, 0) + te.AddTask(task2, 0) + te.AddTask(task3, 0) + + te.Start() + + // Wait for task3 to complete + select { + case <-done: + // Task3 completed + case <-time.After(1 * time.Second): + t.Fatal("Timed out waiting for task3 to complete") + } + + // Give a little extra time for any other tasks that might execute + time.Sleep(200 * time.Millisecond) + + // Check that task1 executed and failed + if !executed["task1"] { + t.Error("task1 should have executed") + } + + // Check that task2 did not execute due to failed dependency + if executed["task2"] { + t.Error("task2 should not have executed because task1 failed") + } + + // Check that task3 executed (independent task) + if !executed["task3"] { + t.Error("task3 should have executed") + } +} + +func TestCircularDependencies(t *testing.T) { + te := NewTaskExecutor(10) + + // Create a circular dependency: task1 -> task2 -> task3 -> task1 + task1 := &mockTask{ + id: "task1", + dependencies: []string{"task3"}, + executeFunc: func() error { + return nil + }, + } + + task2 := &mockTask{ + id: "task2", + dependencies: []string{"task1"}, + executeFunc: func() error { + return nil + }, + } + + task3 := &mockTask{ + id: "task3", + dependencies: []string{"task2"}, + executeFunc: func() error { + return nil + }, + } + + // The first two tasks should be added successfully + te.AddTask(task1, 0) + te.AddTask(task2, 0) + + // This should be rejected due to circular dependency detection + te.AddTask(task3, 0) + + // Start the executor + te.Start() + + // Wait a bit to ensure tasks don't execute + time.Sleep(500 * time.Millisecond) + + // There should still be only 2 tasks (task3 should have been rejected) + if te.Len() != 2 { + t.Errorf("Expected 2 tasks, got %d", te.Len()) + } +} + +func TestNonExistentDependency(t *testing.T) { + te := NewTaskExecutor(10) + + // Create a task that depends on a non-existent task + executed := make(map[string]bool) + done := make(chan bool, 1) + + task1 := &mockTask{ + id: "task1", + dependencies: []string{"non-existent-task"}, + executeFunc: func() error { + executed["task1"] = true + done <- true + return nil + }, + } + + // Add a control task to signal completion + task2 := &mockTask{ + id: "task2", + dependencies: []string{}, + executeFunc: func() error { + executed["task2"] = true + done <- true + return nil + }, + } + + // Both tasks should be added, but task1 won't run due to missing dependency + te.AddTask(task1, 0) + te.AddTask(task2, 0) + + te.Start() + + // Wait for the signal task to complete + select { + case <-done: + // Task2 completed + case <-time.After(1 * time.Second): + t.Fatal("Timed out waiting for tasks to complete") + } + + // Give extra time for any other tasks + time.Sleep(200 * time.Millisecond) + + // Task2 should have executed + if !executed["task2"] { + t.Error("task2 should have executed") + } + + // Task1 should not have executed due to missing dependency + if executed["task1"] { + t.Error("task1 should not have executed due to missing dependency") + } +} + var ErrorTask = errors.New("task failed") var executeTaskTestCases = []struct { name string @@ -121,6 +456,7 @@ func TestExecuteTask(t *testing.T) { te := NewTaskExecutor(10) executeCalled := false te.AddTask(&mockTask{ + id: "task1", executeFunc: func() error { executeCalled = true return tc.executeError @@ -136,4 +472,4 @@ func TestExecuteTask(t *testing.T) { } }) } -} +} \ No newline at end of file