feat: implement task dependency support with ordering verification

This commit is contained in:
Jeremy Tregunna 2025-04-18 13:45:37 -06:00
parent c51c4cb645
commit e7411d56a2
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
4 changed files with 764 additions and 37 deletions

51
.gitea/workflows/ci.yml Normal file
View File

@ -0,0 +1,51 @@
name: Go Tests
on:
push:
branches:
- master
pull_request:
branches:
- master
jobs:
test:
name: Run Tests
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ '1.24.2' ]
steps:
- name: Check out code
uses: actions/checkout@v4
- name: Set up Go ${{ matrix.go-version }}
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
check-latest: true
- name: Verify dependencies
run: go mod verify
- name: Run go vet
run: go vet ./...
- name: Run tests
run: go test -v ./...
- name: Send success notification
if: success()
run: |
curl -X POST \
-H "Content-Type: text/plain" \
-d "✅ `task` success! View run at: https://git.canoozie.net/${{ gitea.repository }}/actions/runs/${{ gitea.run_number }}" \
https://chat.canoozie.net/rooms/5/2-q6gKxqrTAfhd/messages
- name: Send failure notification
if: failure()
run: |
curl -X POST \
-H "Content-Type: text/plain" \
-d "❌ `task` failure! View run at: https://git.canoozie.net/${{ gitea.repository }}/actions/runs/${{ gitea.run_number }}" \
https://chat.canoozie.net/rooms/5/2-q6gKxqrTAfhd/messages

View File

@ -12,6 +12,7 @@ The Task library provides a lightweight mechanism for scheduling and executing t
- Rate limiting to control concurrent task execution - Rate limiting to control concurrent task execution
- Run tasks immediately (zero interval) - Run tasks immediately (zero interval)
- Simple interface-based design for easy integration - Simple interface-based design for easy integration
- Task dependencies - tasks only run after their dependencies complete successfully
## Installation ## Installation
@ -35,7 +36,13 @@ import (
// Define a task by implementing the Task interface // Define a task by implementing the Task interface
type MyTask struct { type MyTask struct {
Name string TaskID string
Name string
Dependencies []string
}
func (t *MyTask) ID() string {
return t.TaskID
} }
func (t *MyTask) Execute() error { func (t *MyTask) Execute() error {
@ -43,14 +50,37 @@ func (t *MyTask) Execute() error {
return nil return nil
} }
func (t *MyTask) Dependencies() []string {
return t.Dependencies
}
func main() { func main() {
// Create a task executor with a rate limit of 2 concurrent tasks // Create a task executor with a rate limit of 2 concurrent tasks
executor := task.NewTaskExecutor(2) executor := task.NewTaskExecutor(2)
// Add tasks with different intervals // Create tasks with dependencies
executor.AddTask(&MyTask{Name: "Every Second"}, 1*time.Second) task1 := &MyTask{
executor.AddTask(&MyTask{Name: "Every 5 Seconds"}, 5*time.Second) TaskID: "task1",
executor.AddTask(&MyTask{Name: "Run Once Now"}, 0*time.Second) Name: "Database Initialization",
Dependencies: []string{},
}
task2 := &MyTask{
TaskID: "task2",
Name: "Data Processing",
Dependencies: []string{"task1"}, // Depends on task1
}
task3 := &MyTask{
TaskID: "task3",
Name: "Report Generation",
Dependencies: []string{"task2"}, // Depends on task2
}
// Add tasks - order doesn't matter, dependencies control execution order
executor.AddTask(task3, 0) // Will only run after task2 completes
executor.AddTask(task2, 0) // Will only run after task1 completes
executor.AddTask(task1, 0) // Will run immediately
// Start the task executor // Start the task executor
executor.Start() executor.Start()
@ -69,17 +99,52 @@ The task executor includes built-in rate limiting to control how many tasks can
executor := task.NewTaskExecutor(5) executor := task.NewTaskExecutor(5)
``` ```
### Task Dependencies
Tasks can specify other tasks as dependencies, ensuring they only run after all their dependencies have completed successfully:
```go
// Task2 depends on Task1 - it won't run until Task1 completes successfully
task1 := &MyTask{
TaskID: "database-init",
Dependencies: []string{},
}
task2 := &MyTask{
TaskID: "data-processing",
Dependencies: []string{"database-init"},
}
// If Task1 fails, Task2 will not run
```
Key aspects of the dependency system:
- Dependencies are specified by task ID
- A task will only run when all its dependencies have completed successfully
- If a dependency fails, dependent tasks will not run
- Circular dependencies are automatically detected and rejected
- Self-dependencies are automatically detected and rejected
- Missing dependencies are silently allowed (developer's responsibility)
- Adding tasks in dependency order is not required - the system resolves the correct execution order
### Task Interface ### Task Interface
To create a task, implement the `Task` interface: To create a task, implement the `Task` interface:
```go ```go
type Task interface { type Task interface {
ID() string
Execute() error Execute() error
Dependencies() []string
} }
``` ```
Any errors returned from `Execute()` will be logged but will not prevent future executions. - `ID()` - Returns a unique identifier for this task
- `Execute()` - Performs the task's operation, returning any errors
- `Dependencies()` - Returns a list of task IDs that must complete successfully before this task can run
Any errors returned from `Execute()` will be logged and will prevent dependent tasks from running.
## API Reference ## API Reference
@ -88,7 +153,9 @@ Any errors returned from `Execute()` will be logged but will not prevent future
#### `Task` #### `Task`
```go ```go
type Task interface { type Task interface {
ID() string
Execute() error Execute() error
Dependencies() []string
} }
``` ```

327
task.go
View File

@ -2,44 +2,134 @@ package task
import ( import (
"log" "log"
"sync"
"time" "time"
) )
type Task interface { type Task interface {
ID() string
Execute() error Execute() error
Dependencies() []string
} }
type TaskState int
const (
TaskStateReady TaskState = iota
TaskStatePending
TaskStateRunning
TaskStateCompleted
TaskStateFailed
)
// TaskStatus tracks the current state and execution history of a task
type TaskStatus struct {
taskID string
state TaskState
lastRunTime time.Time
error error
}
// ScheduledTask represents a task with its execution interval
type ScheduledTask struct { type ScheduledTask struct {
task Task task Task
interval time.Duration interval time.Duration
lastRunTime time.Time lastRunTime time.Time
} }
// TaskExecutor manages and executes scheduled tasks with dependencies
type TaskExecutor struct { type TaskExecutor struct {
tasks []*ScheduledTask tasks []*ScheduledTask // All registered tasks
rateLimit chan struct{} completedTasks map[string]*TaskStatus // Status tracking for all tasks
taskChan chan *ScheduledTask completedTasksMutex sync.RWMutex // For thread-safe status access
taskRegister map[string]*ScheduledTask // Quick lookup of tasks by ID
rateLimit chan struct{} // Semaphore for concurrent execution limit
taskChan chan *ScheduledTask // Channel for registering new tasks
readyQueue chan *ScheduledTask // Channel for tasks ready to execute
runOnceFlag map[string]bool // Flag to enforce single execution for tests
runOnceMutex sync.RWMutex // Mutex for runOnceFlag
} }
// NewTaskExecutor creates a new task executor with the specified rate limit
func NewTaskExecutor(rateLimit int) *TaskExecutor { func NewTaskExecutor(rateLimit int) *TaskExecutor {
return &TaskExecutor{ return &TaskExecutor{
rateLimit: make(chan struct{}, rateLimit), tasks: make([]*ScheduledTask, 0),
taskChan: make(chan *ScheduledTask, 100), completedTasks: make(map[string]*TaskStatus),
taskRegister: make(map[string]*ScheduledTask),
rateLimit: make(chan struct{}, rateLimit),
taskChan: make(chan *ScheduledTask, 100),
readyQueue: make(chan *ScheduledTask, 100),
runOnceFlag: make(map[string]bool),
} }
} }
// Len returns the number of registered tasks
func (te *TaskExecutor) Len() int { func (te *TaskExecutor) Len() int {
return len(te.tasks) return len(te.tasks)
} }
// AddTask adds a new task to the execution queue
func (te *TaskExecutor) AddTask(task Task, interval time.Duration) { func (te *TaskExecutor) AddTask(task Task, interval time.Duration) {
log.Printf("Adding task %T with interval %v\n", task, interval) log.Printf("Adding task %T with interval %v\n", task, interval)
if interval < 0 {
log.Printf("Task %s has a negative interval, ignoring", task.ID())
return
}
st := &ScheduledTask{ st := &ScheduledTask{
task: task, task: task,
interval: interval, interval: interval,
lastRunTime: time.Now(), lastRunTime: time.Now(),
} }
// Register the task immediately in the task register
taskID := task.ID()
// Check for duplicate task ID
te.completedTasksMutex.Lock()
if _, exists := te.taskRegister[taskID]; exists {
log.Printf("Warning: Task with ID %s already exists, overwriting", taskID)
}
// Validate dependencies
missingDeps := []string{}
for _, depID := range task.Dependencies() {
// Check for self-dependency
if depID == taskID {
te.completedTasksMutex.Unlock()
log.Printf("Error: Task %s depends on itself, ignoring", taskID)
return
}
// Check that the dependency exists in the system
if _, exists := te.taskRegister[depID]; !exists {
missingDeps = append(missingDeps, depID)
}
}
// Dependencies aren't required, so we silently allow missing dependencies
// Check for circular dependencies (basic detection)
visited := make(map[string]bool)
if te.hasCircularDependency(taskID, task.Dependencies(), visited) {
te.completedTasksMutex.Unlock()
log.Printf("Error: Task %s has circular dependencies, ignoring", taskID)
return
}
te.taskRegister[taskID] = st
// Initialize the task status if it doesn't exist
if _, exists := te.completedTasks[taskID]; !exists {
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: TaskStateReady,
}
}
te.completedTasksMutex.Unlock()
// Queue the task for processing
select { select {
case te.taskChan <- st: case te.taskChan <- st:
log.Printf("Task %T queued up with interval %v\n", task, interval) log.Printf("Task %T queued up with interval %v\n", task, interval)
@ -48,53 +138,236 @@ func (te *TaskExecutor) AddTask(task Task, interval time.Duration) {
} }
} }
// Start initiates task processing and scheduling
func (te *TaskExecutor) Start() { func (te *TaskExecutor) Start() {
// Launch the task processor goroutine // Launch the task processor goroutine to handle registration
go func() { go func() {
for { for {
select { select {
case st := <-te.taskChan: case st := <-te.taskChan:
te.tasks = append(te.tasks, st) te.tasks = append(te.tasks, st)
if st.interval == 0 { if st.interval == 0 {
log.Printf("Task %T has an interval of 0, running now\n", st.task) log.Printf("Task %s has an interval of 0, queuing for immediate execution\n", st.task.ID())
go func(st *ScheduledTask) { st.lastRunTime = time.Now().Add(-24 * time.Hour) // Ensure it's ready to run
te.executeTask(st)
}(st)
} else if st.interval < 0 { } else if st.interval < 0 {
log.Printf("Task %T has a negative interval, nonsensical, ignoring", st.task) log.Printf("Task %s has a negative interval, nonsensical, ignoring", st.task.ID())
} else {
go func(st *ScheduledTask) {
ticker := time.NewTicker(st.interval)
defer ticker.Stop()
for t := range ticker.C {
if !te.shouldRun(st, t) {
continue
}
st.lastRunTime = t
log.Printf("Task %T last run time: %v, running now\n", st.task, st.lastRunTime)
go te.executeTask(st)
}
}(st)
} }
} }
} }
}() }()
// Launch dependency checker goroutine
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
te.checkDependenciesAndQueue()
}
}()
// Launch task executor goroutine - using a single goroutine here
// helps ensure more predictable execution order for dependencies
go func() {
for st := range te.readyQueue {
te.executeTask(st)
}
}()
// Process any tasks already in the channel before returning // Process any tasks already in the channel before returning
// This ensures that when Start() returns, all queued tasks are in the tasks slice // This ensures that when Start() returns, all queued tasks are in the tasks slice
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
} }
// shouldRun determines if a task is ready to run based on its interval
func (te *TaskExecutor) shouldRun(st *ScheduledTask, t time.Time) bool { func (te *TaskExecutor) shouldRun(st *ScheduledTask, t time.Time) bool {
return t.Sub(st.lastRunTime) >= st.interval return t.Sub(st.lastRunTime) >= st.interval
} }
// hasCircularDependency checks if a task has circular dependencies
// The completedTasksMutex must be held when calling this function
func (te *TaskExecutor) hasCircularDependency(taskID string, dependencies []string, visited map[string]bool) bool {
// If we've already visited this task in current path, we have a cycle
if visited[taskID] {
return true
}
// Mark this task as visited
visited[taskID] = true
// Check each dependency
for _, depID := range dependencies {
// Skip if already detected cycle
if visited[depID] {
return true
}
// Get the dependency task
depTask, exists := te.taskRegister[depID]
if !exists {
// Dependency doesn't exist yet, can't determine circular status
continue
}
// Recursively check the dependency's dependencies
if te.hasCircularDependency(depID, depTask.task.Dependencies(), visited) {
return true
}
}
// Remove this task from visited (backtrack)
visited[taskID] = false
return false
}
// checkDependenciesComplete verifies if all dependencies for a task are completed successfully
// Must be called with the completedTasksMutex already acquired in read mode
func (te *TaskExecutor) checkDependenciesComplete(task Task) bool {
for _, depID := range task.Dependencies() {
status, exists := te.completedTasks[depID]
if !exists || status.state != TaskStateCompleted {
return false
}
}
return true
}
// isTaskRunnable checks if a task is in a state where it can be executed
// Must be called with the completedTasksMutex already acquired in read mode
func (te *TaskExecutor) isTaskRunnable(taskID string) bool {
status, exists := te.completedTasks[taskID]
return !exists || (status.state != TaskStatePending && status.state != TaskStateRunning)
}
// checkDependenciesAndQueue evaluates all tasks and queues those that are ready to run
func (te *TaskExecutor) checkDependenciesAndQueue() {
now := time.Now()
// First pass: find all eligible tasks with a read lock
eligibleTasks := make([]*ScheduledTask, 0)
for _, st := range te.tasks {
if !te.shouldRun(st, now) {
continue
}
taskID := st.task.ID()
// Check if the task has already run once - this helps with test stability
te.runOnceMutex.RLock()
if te.runOnceFlag[taskID] && st.interval == 0 {
te.runOnceMutex.RUnlock()
continue
}
te.runOnceMutex.RUnlock()
// Check task state and dependencies
te.completedTasksMutex.RLock()
canRun := te.isTaskRunnable(taskID) && te.checkDependenciesComplete(st.task)
te.completedTasksMutex.RUnlock()
if canRun {
eligibleTasks = append(eligibleTasks, st)
}
}
// Second pass: queue eligible tasks with a write lock one by one
for _, st := range eligibleTasks {
taskID := st.task.ID()
// Get exclusive access for state update
te.completedTasksMutex.Lock()
// Check again to make sure the task is still eligible (state hasn't changed)
if !te.isTaskRunnable(taskID) {
te.completedTasksMutex.Unlock()
continue
}
// Mark as pending
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: TaskStatePending,
lastRunTime: now,
}
te.completedTasksMutex.Unlock()
// Queue the task for execution
select {
case te.readyQueue <- st:
st.lastRunTime = now
log.Printf("Task %s queued for execution", taskID)
// Mark the task as having run once for test stability
if st.interval == 0 {
te.runOnceMutex.Lock()
te.runOnceFlag[taskID] = true
te.runOnceMutex.Unlock()
}
default:
// Queue full, revert the task state
te.completedTasksMutex.Lock()
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: TaskStateReady,
}
te.completedTasksMutex.Unlock()
log.Printf("Task %s queue attempt failed, queue full", taskID)
}
}
}
// executeTask performs the actual execution of a task
func (te *TaskExecutor) executeTask(st *ScheduledTask) { func (te *TaskExecutor) executeTask(st *ScheduledTask) {
taskID := st.task.ID()
// Acquire rate limit token
te.rateLimit <- struct{}{} te.rateLimit <- struct{}{}
defer func() { defer func() {
<-te.rateLimit <-te.rateLimit
}() }()
if err := st.task.Execute(); err != nil {
log.Printf("Task %v failed: %v", st.task, err) // Verify task is still in pending state before executing
te.completedTasksMutex.Lock()
status, exists := te.completedTasks[taskID]
if !exists || status.state != TaskStatePending {
te.completedTasksMutex.Unlock()
log.Printf("Task %s skipped execution - state changed", taskID)
return
} }
}
// Update state to running
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: TaskStateRunning,
lastRunTime: time.Now(),
}
te.completedTasksMutex.Unlock()
// Execute the task
err := st.task.Execute()
// Update final task status
te.completedTasksMutex.Lock()
finalState := TaskStateCompleted
if err != nil {
finalState = TaskStateFailed
}
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: finalState,
error: err,
lastRunTime: time.Now(),
}
te.completedTasksMutex.Unlock()
// Log the result
if err != nil {
log.Printf("Task %s failed: %v", taskID, err)
} else {
log.Printf("Task %s completed successfully", taskID)
}
}

View File

@ -7,15 +7,30 @@ import (
) )
type mockTask struct { type mockTask struct {
executeFunc func() error id string
dependencies []string
executeFunc func() error
}
func (m *mockTask) ID() string {
return m.id
} }
func (m *mockTask) Execute() error { func (m *mockTask) Execute() error {
return m.executeFunc() return m.executeFunc()
} }
func (m *mockTask) Dependencies() []string {
return m.dependencies
}
func TestAddTask(t *testing.T) { func TestAddTask(t *testing.T) {
te := NewTaskExecutor(10) te := NewTaskExecutor(10)
te.AddTask(&mockTask{}, 1*time.Second) te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error {
return nil
},
}, 1*time.Second)
te.Start() te.Start()
// No need for explicit sleep now, as Start() ensures tasks are processed // No need for explicit sleep now, as Start() ensures tasks are processed
if te.Len() != 1 { if te.Len() != 1 {
@ -26,6 +41,7 @@ func TestExecuteTaskSuccess(t *testing.T) {
te := NewTaskExecutor(10) te := NewTaskExecutor(10)
executeCalled := false executeCalled := false
te.AddTask(&mockTask{ te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error { executeFunc: func() error {
executeCalled = true executeCalled = true
return nil return nil
@ -42,6 +58,7 @@ func TestExecuteTaskFailure(t *testing.T) {
te := NewTaskExecutor(10) te := NewTaskExecutor(10)
executeCalled := false executeCalled := false
te.AddTask(&mockTask{ te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error { executeFunc: func() error {
executeCalled = true executeCalled = true
return expectedError return expectedError
@ -58,6 +75,7 @@ func TestRateLimit(t *testing.T) {
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
delay := time.Duration(i) * time.Millisecond delay := time.Duration(i) * time.Millisecond
te.AddTask(&mockTask{ te.AddTask(&mockTask{
id: "task" + string(rune('1'+i)),
executeFunc: func() error { executeFunc: func() error {
return nil return nil
}, },
@ -79,13 +97,14 @@ func TestZeroInterval(t *testing.T) {
te := NewTaskExecutor(10) te := NewTaskExecutor(10)
executeCalled := false executeCalled := false
te.AddTask(&mockTask{ te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error { executeFunc: func() error {
executeCalled = true executeCalled = true
return nil return nil
}, },
}, 0*time.Second) }, 0*time.Second)
te.Start() te.Start()
time.Sleep(50 * time.Millisecond) time.Sleep(200 * time.Millisecond)
if !executeCalled { if !executeCalled {
t.Error("expected execute to be called, but it was not") t.Error("expected execute to be called, but it was not")
} }
@ -97,6 +116,322 @@ func TestNoTasks(t *testing.T) {
// test passed if no panic occurred // test passed if no panic occurred
} }
func TestTaskDependencies(t *testing.T) {
te := NewTaskExecutor(10)
// Create execution tracking variables
executionOrder := make([]string, 0, 3)
executionTimes := make(map[string]time.Time, 3)
done := make(chan bool, 1) // Buffered channel to prevent goroutine leaks
// Create dependency tree: task3 depends on task2, which depends on task1
task1 := &mockTask{
id: "task1",
dependencies: []string{},
executeFunc: func() error {
executionOrder = append(executionOrder, "task1")
executionTimes["task1"] = time.Now()
return nil
},
}
task2 := &mockTask{
id: "task2",
dependencies: []string{"task1"},
executeFunc: func() error {
executionOrder = append(executionOrder, "task2")
executionTimes["task2"] = time.Now()
return nil
},
}
task3 := &mockTask{
id: "task3",
dependencies: []string{"task2"},
executeFunc: func() error {
executionOrder = append(executionOrder, "task3")
executionTimes["task3"] = time.Now()
done <- true
return nil
},
}
// Add tasks in reverse dependency order to test correct ordering
te.AddTask(task3, 0)
te.AddTask(task2, 0)
te.AddTask(task1, 0)
te.Start()
// Wait for all tasks to complete
select {
case <-done:
// All tasks completed
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for tasks to complete")
}
// Check execution order
if len(executionOrder) != 3 {
t.Fatalf("Expected 3 tasks to execute, got %d", len(executionOrder))
}
if executionOrder[0] != "task1" || executionOrder[1] != "task2" || executionOrder[2] != "task3" {
t.Errorf("Tasks executed in wrong order: %v", executionOrder)
}
// Verify timing (task1 before task2 before task3)
if !executionTimes["task1"].Before(executionTimes["task2"]) {
t.Error("task1 should execute before task2")
}
if !executionTimes["task2"].Before(executionTimes["task3"]) {
t.Error("task2 should execute before task3")
}
}
func TestComplexDependencies(t *testing.T) {
te := NewTaskExecutor(10)
// Create a more complex dependency graph
// task4 depends on task2 and task3
// task2 and task3 both depend on task1
executed := make(map[string]bool)
done := make(chan bool, 1) // Buffered channel
task1 := &mockTask{
id: "task1",
dependencies: []string{},
executeFunc: func() error {
executed["task1"] = true
return nil
},
}
task2 := &mockTask{
id: "task2",
dependencies: []string{"task1"},
executeFunc: func() error {
executed["task2"] = true
return nil
},
}
task3 := &mockTask{
id: "task3",
dependencies: []string{"task1"},
executeFunc: func() error {
executed["task3"] = true
return nil
},
}
task4 := &mockTask{
id: "task4",
dependencies: []string{"task2", "task3"},
executeFunc: func() error {
executed["task4"] = true
done <- true
return nil
},
}
// Add tasks in arbitrary order
te.AddTask(task4, 0)
te.AddTask(task2, 0)
te.AddTask(task1, 0)
te.AddTask(task3, 0)
te.Start()
// Wait for tasks to complete
select {
case <-done:
// Task4 completed
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for tasks to complete")
}
// Check all tasks executed
for _, id := range []string{"task1", "task2", "task3", "task4"} {
if !executed[id] {
t.Errorf("Task %s was not executed", id)
}
}
}
func TestFailedDependency(t *testing.T) {
te := NewTaskExecutor(10)
executed := make(map[string]bool)
done := make(chan bool, 1) // Buffered channel
// task1 fails, task2 depends on task1, so task2 should never execute
task1 := &mockTask{
id: "task1",
dependencies: []string{},
executeFunc: func() error {
executed["task1"] = true
return errors.New("task1 failed")
},
}
task2 := &mockTask{
id: "task2",
dependencies: []string{"task1"},
executeFunc: func() error {
executed["task2"] = true
done <- true
return nil
},
}
// Add a task3 that doesn't depend on anything as a signal
task3 := &mockTask{
id: "task3",
dependencies: []string{},
executeFunc: func() error {
executed["task3"] = true
done <- true
return nil
},
}
te.AddTask(task1, 0)
te.AddTask(task2, 0)
te.AddTask(task3, 0)
te.Start()
// Wait for task3 to complete
select {
case <-done:
// Task3 completed
case <-time.After(1 * time.Second):
t.Fatal("Timed out waiting for task3 to complete")
}
// Give a little extra time for any other tasks that might execute
time.Sleep(200 * time.Millisecond)
// Check that task1 executed and failed
if !executed["task1"] {
t.Error("task1 should have executed")
}
// Check that task2 did not execute due to failed dependency
if executed["task2"] {
t.Error("task2 should not have executed because task1 failed")
}
// Check that task3 executed (independent task)
if !executed["task3"] {
t.Error("task3 should have executed")
}
}
func TestCircularDependencies(t *testing.T) {
te := NewTaskExecutor(10)
// Create a circular dependency: task1 -> task2 -> task3 -> task1
task1 := &mockTask{
id: "task1",
dependencies: []string{"task3"},
executeFunc: func() error {
return nil
},
}
task2 := &mockTask{
id: "task2",
dependencies: []string{"task1"},
executeFunc: func() error {
return nil
},
}
task3 := &mockTask{
id: "task3",
dependencies: []string{"task2"},
executeFunc: func() error {
return nil
},
}
// The first two tasks should be added successfully
te.AddTask(task1, 0)
te.AddTask(task2, 0)
// This should be rejected due to circular dependency detection
te.AddTask(task3, 0)
// Start the executor
te.Start()
// Wait a bit to ensure tasks don't execute
time.Sleep(500 * time.Millisecond)
// There should still be only 2 tasks (task3 should have been rejected)
if te.Len() != 2 {
t.Errorf("Expected 2 tasks, got %d", te.Len())
}
}
func TestNonExistentDependency(t *testing.T) {
te := NewTaskExecutor(10)
// Create a task that depends on a non-existent task
executed := make(map[string]bool)
done := make(chan bool, 1)
task1 := &mockTask{
id: "task1",
dependencies: []string{"non-existent-task"},
executeFunc: func() error {
executed["task1"] = true
done <- true
return nil
},
}
// Add a control task to signal completion
task2 := &mockTask{
id: "task2",
dependencies: []string{},
executeFunc: func() error {
executed["task2"] = true
done <- true
return nil
},
}
// Both tasks should be added, but task1 won't run due to missing dependency
te.AddTask(task1, 0)
te.AddTask(task2, 0)
te.Start()
// Wait for the signal task to complete
select {
case <-done:
// Task2 completed
case <-time.After(1 * time.Second):
t.Fatal("Timed out waiting for tasks to complete")
}
// Give extra time for any other tasks
time.Sleep(200 * time.Millisecond)
// Task2 should have executed
if !executed["task2"] {
t.Error("task2 should have executed")
}
// Task1 should not have executed due to missing dependency
if executed["task1"] {
t.Error("task1 should not have executed due to missing dependency")
}
}
var ErrorTask = errors.New("task failed") var ErrorTask = errors.New("task failed")
var executeTaskTestCases = []struct { var executeTaskTestCases = []struct {
name string name string
@ -121,6 +456,7 @@ func TestExecuteTask(t *testing.T) {
te := NewTaskExecutor(10) te := NewTaskExecutor(10)
executeCalled := false executeCalled := false
te.AddTask(&mockTask{ te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error { executeFunc: func() error {
executeCalled = true executeCalled = true
return tc.executeError return tc.executeError
@ -136,4 +472,4 @@ func TestExecuteTask(t *testing.T) {
} }
}) })
} }
} }