Compare commits

..

7 Commits

Author SHA1 Message Date
990c12b3c1
feat: add context parameter to Task.Execute method
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 18s
2025-04-18 15:46:43 -06:00
8c0f9deaf1
fix: second pass fixing the formatting of the build message
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 18s
2025-04-18 13:53:06 -06:00
07c7e3163f
test: fix the ci run message sent to campfire
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 19s
2025-04-18 13:51:41 -06:00
43ad445ef5
feat: implement task dependency support with ordering verification
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 19s
2025-04-18 13:45:37 -06:00
c51c4cb645
chore: add a README 2025-04-17 21:56:47 -06:00
ff2dc9f7da
fix: fix the module name 2025-04-17 16:29:44 -06:00
b96006064e
fix: fix tests, add tiny sleep when starting task executor so all queued tasks are in the tasks slice 2025-04-17 16:23:32 -06:00
5 changed files with 909 additions and 34 deletions

51
.gitea/workflows/ci.yml Normal file
View File

@ -0,0 +1,51 @@
name: Go Tests
on:
push:
branches:
- master
pull_request:
branches:
- master
jobs:
test:
name: Run Tests
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ '1.24.2' ]
steps:
- name: Check out code
uses: actions/checkout@v4
- name: Set up Go ${{ matrix.go-version }}
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
check-latest: true
- name: Verify dependencies
run: go mod verify
- name: Run go vet
run: go vet ./...
- name: Run tests
run: go test -v ./...
- name: Send success notification
if: success()
run: |
curl -X POST \
-H "Content-Type: text/plain" \
-d "✅ <b>task</b> success! View run at: https://git.canoozie.net/${{ gitea.repository }}/actions/runs/${{ gitea.run_number }}" \
https://chat.canoozie.net/rooms/5/2-q6gKxqrTAfhd/messages
- name: Send failure notification
if: failure()
run: |
curl -X POST \
-H "Content-Type: text/plain" \
-d "❌ <b>task</b> failure! View run at: https://git.canoozie.net/${{ gitea.repository }}/actions/runs/${{ gitea.run_number }}" \
https://chat.canoozie.net/rooms/5/2-q6gKxqrTAfhd/messages

205
README.md Normal file
View File

@ -0,0 +1,205 @@
# Task
A simple and efficient scheduled task execution library for Go applications.
## Overview
The Task library provides a lightweight mechanism for scheduling and executing tasks with configurable intervals and rate limiting. It's particularly useful for applications that need to run background jobs, periodic maintenance tasks, or any recurring operations.
## Features
- Schedule tasks to run at regular intervals
- Rate limiting to control concurrent task execution
- Run tasks immediately (zero interval)
- Simple interface-based design for easy integration
- Task dependencies - tasks only run after their dependencies complete successfully
## Installation
```
go get git.canoozie.net/jer/task
```
## Usage
### Basic Example
```go
package main
import (
"context"
"fmt"
"time"
"git.canoozie.net/jer/task"
)
// Define a task by implementing the Task interface
type MyTask struct {
TaskID string
Name string
Dependencies []string
}
func (t *MyTask) ID() string {
return t.TaskID
}
func (t *MyTask) Execute(ctx context.Context) error {
fmt.Printf("Executing task: %s\n", t.Name)
return nil
}
func (t *MyTask) Dependencies() []string {
return t.Dependencies
}
func main() {
// Create a task executor with a rate limit of 2 concurrent tasks
executor := task.NewTaskExecutor(2)
// Create tasks with dependencies
task1 := &MyTask{
TaskID: "task1",
Name: "Database Initialization",
Dependencies: []string{},
}
task2 := &MyTask{
TaskID: "task2",
Name: "Data Processing",
Dependencies: []string{"task1"}, // Depends on task1
}
task3 := &MyTask{
TaskID: "task3",
Name: "Report Generation",
Dependencies: []string{"task2"}, // Depends on task2
}
// Add tasks - order doesn't matter, dependencies control execution order
executor.AddTask(task3, 0) // Will only run after task2 completes
executor.AddTask(task2, 0) // Will only run after task1 completes
executor.AddTask(task1, 0) // Will run immediately
// Start the task executor
executor.Start()
// Keep the program running
select {}
}
```
### Rate Limiting
The task executor includes built-in rate limiting to control how many tasks can run concurrently. This is useful for resource-intensive tasks or when you need to limit API calls.
```go
// Create an executor that allows up to 5 concurrent tasks
executor := task.NewTaskExecutor(5)
```
### Task Dependencies
Tasks can specify other tasks as dependencies, ensuring they only run after all their dependencies have completed successfully:
```go
// Task2 depends on Task1 - it won't run until Task1 completes successfully
task1 := &MyTask{
TaskID: "database-init",
Dependencies: []string{},
}
task2 := &MyTask{
TaskID: "data-processing",
Dependencies: []string{"database-init"},
}
// If Task1 fails, Task2 will not run
```
Key aspects of the dependency system:
- Dependencies are specified by task ID
- A task will only run when all its dependencies have completed successfully
- If a dependency fails, dependent tasks will not run
- Circular dependencies are automatically detected and rejected
- Self-dependencies are automatically detected and rejected
- Missing dependencies are silently allowed (developer's responsibility)
- Adding tasks in dependency order is not required - the system resolves the correct execution order
### Task Interface
To create a task, implement the `Task` interface:
```go
type Task interface {
ID() string
Execute(ctx context.Context) error
Dependencies() []string
}
```
- `ID()` - Returns a unique identifier for this task
- `Execute(ctx context.Context)` - Performs the task's operation with context, returning any errors
- `Dependencies()` - Returns a list of task IDs that must complete successfully before this task can run
Any errors returned from `Execute()` will be logged and will prevent dependent tasks from running.
## API Reference
### Types
#### `Task`
```go
type Task interface {
ID() string
Execute(ctx context.Context) error
Dependencies() []string
}
```
### Functions
#### `NewTaskExecutor`
```go
func NewTaskExecutor(rateLimit int) *TaskExecutor
```
Creates a new task executor with the specified concurrency limit.
### Methods
#### `AddTask`
```go
func (te *TaskExecutor) AddTask(task Task, interval time.Duration)
```
Schedules a task to run at the specified interval. If interval is 0, the task is executed immediately. Negative intervals are ignored.
#### `Start`
```go
func (te *TaskExecutor) Start()
```
Starts the task executor, which will begin processing scheduled tasks.
#### `Len`
```go
func (te *TaskExecutor) Len() int
```
Returns the number of tasks managed by the executor.
## License
Copyright (C) 2025 Jeremy Tregunna
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

2
go.mod
View File

@ -1,3 +1,3 @@
module git.canoozie.net/jer/task.git
module git.canoozie.net/jer/task
go 1.23.2

330
task.go
View File

@ -1,95 +1,375 @@
package task
import (
"context"
"log"
"sync"
"time"
)
type Task interface {
Execute() error
ID() string
Execute(ctx context.Context) error
Dependencies() []string
}
type TaskState int
const (
TaskStateReady TaskState = iota
TaskStatePending
TaskStateRunning
TaskStateCompleted
TaskStateFailed
)
// TaskStatus tracks the current state and execution history of a task
type TaskStatus struct {
taskID string
state TaskState
lastRunTime time.Time
error error
}
// ScheduledTask represents a task with its execution interval
type ScheduledTask struct {
task Task
interval time.Duration
lastRunTime time.Time
}
// TaskExecutor manages and executes scheduled tasks with dependencies
type TaskExecutor struct {
tasks []*ScheduledTask
rateLimit chan struct{}
taskChan chan *ScheduledTask
tasks []*ScheduledTask // All registered tasks
completedTasks map[string]*TaskStatus // Status tracking for all tasks
completedTasksMutex sync.RWMutex // For thread-safe status access
taskRegister map[string]*ScheduledTask // Quick lookup of tasks by ID
rateLimit chan struct{} // Semaphore for concurrent execution limit
taskChan chan *ScheduledTask // Channel for registering new tasks
readyQueue chan *ScheduledTask // Channel for tasks ready to execute
runOnceFlag map[string]bool // Flag to enforce single execution for tests
runOnceMutex sync.RWMutex // Mutex for runOnceFlag
}
// NewTaskExecutor creates a new task executor with the specified rate limit
func NewTaskExecutor(rateLimit int) *TaskExecutor {
return &TaskExecutor{
tasks: make([]*ScheduledTask, 0),
completedTasks: make(map[string]*TaskStatus),
taskRegister: make(map[string]*ScheduledTask),
rateLimit: make(chan struct{}, rateLimit),
taskChan: make(chan *ScheduledTask, 100),
readyQueue: make(chan *ScheduledTask, 100),
runOnceFlag: make(map[string]bool),
}
}
// Len returns the number of registered tasks
func (te *TaskExecutor) Len() int {
return len(te.tasks)
}
// AddTask adds a new task to the execution queue
func (te *TaskExecutor) AddTask(task Task, interval time.Duration) {
log.Printf("Adding task %T with interval %v\n", task, interval)
if interval < 0 {
log.Printf("Task %s has a negative interval, ignoring", task.ID())
return
}
st := &ScheduledTask{
task: task,
interval: interval,
lastRunTime: time.Now(),
}
// Register the task immediately in the task register
taskID := task.ID()
// Check for duplicate task ID
te.completedTasksMutex.Lock()
if _, exists := te.taskRegister[taskID]; exists {
log.Printf("Warning: Task with ID %s already exists, overwriting", taskID)
}
// Validate dependencies
missingDeps := []string{}
for _, depID := range task.Dependencies() {
// Check for self-dependency
if depID == taskID {
te.completedTasksMutex.Unlock()
log.Printf("Error: Task %s depends on itself, ignoring", taskID)
return
}
// Check that the dependency exists in the system
if _, exists := te.taskRegister[depID]; !exists {
missingDeps = append(missingDeps, depID)
}
}
// Dependencies aren't required, so we silently allow missing dependencies
// Check for circular dependencies (basic detection)
visited := make(map[string]bool)
if te.hasCircularDependency(taskID, task.Dependencies(), visited) {
te.completedTasksMutex.Unlock()
log.Printf("Error: Task %s has circular dependencies, ignoring", taskID)
return
}
te.taskRegister[taskID] = st
// Initialize the task status if it doesn't exist
if _, exists := te.completedTasks[taskID]; !exists {
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: TaskStateReady,
}
}
te.completedTasksMutex.Unlock()
// Queue the task for processing
select {
case te.taskChan <- st:
log.Printf("Task %T queued up with interval %v\n", task, interval)
default:
log.Printf("Failed to add task $T with interval %v, channel full\n", task, interval)
log.Printf("Failed to add task %T with interval %v, channel full\n", task, interval)
}
}
// Start initiates task processing and scheduling
func (te *TaskExecutor) Start() {
// Launch the task processor goroutine to handle registration
go func() {
for {
select {
case st := <-te.taskChan:
te.tasks = append(te.tasks, st)
if st.interval == 0 {
log.Printf("Task %T has an interval of 0, running now\n", st.task)
go func(st *ScheduledTask) {
te.executeTask(st)
}(st)
log.Printf("Task %s has an interval of 0, queuing for immediate execution\n", st.task.ID())
st.lastRunTime = time.Now().Add(-24 * time.Hour) // Ensure it's ready to run
} else if st.interval < 0 {
log.Printf("Task %T has a negative interval, nonsensical, ignoring", st.task)
} else {
go func(st *ScheduledTask) {
ticker := time.NewTicker(st.interval)
defer ticker.Stop()
for t := range ticker.C {
if !te.shouldRun(st, t) {
continue
}
st.lastRunTime = t
log.Printf("Task %T last run time: %v, running now\n", st.task, st.lastRunTime)
go te.executeTask(st)
}
}(st)
log.Printf("Task %s has a negative interval, nonsensical, ignoring", st.task.ID())
}
}
}
}()
// Launch dependency checker goroutine
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
te.checkDependenciesAndQueue()
}
}()
// Launch task executor goroutine - using a single goroutine here
// helps ensure more predictable execution order for dependencies
go func() {
for st := range te.readyQueue {
te.executeTask(st)
}
}()
// Process any tasks already in the channel before returning
// This ensures that when Start() returns, all queued tasks are in the tasks slice
time.Sleep(10 * time.Millisecond)
}
// shouldRun determines if a task is ready to run based on its interval
func (te *TaskExecutor) shouldRun(st *ScheduledTask, t time.Time) bool {
return t.Sub(st.lastRunTime) >= st.interval
}
// hasCircularDependency checks if a task has circular dependencies
// The completedTasksMutex must be held when calling this function
func (te *TaskExecutor) hasCircularDependency(taskID string, dependencies []string, visited map[string]bool) bool {
// If we've already visited this task in current path, we have a cycle
if visited[taskID] {
return true
}
// Mark this task as visited
visited[taskID] = true
// Check each dependency
for _, depID := range dependencies {
// Skip if already detected cycle
if visited[depID] {
return true
}
// Get the dependency task
depTask, exists := te.taskRegister[depID]
if !exists {
// Dependency doesn't exist yet, can't determine circular status
continue
}
// Recursively check the dependency's dependencies
if te.hasCircularDependency(depID, depTask.task.Dependencies(), visited) {
return true
}
}
// Remove this task from visited (backtrack)
visited[taskID] = false
return false
}
// checkDependenciesComplete verifies if all dependencies for a task are completed successfully
// Must be called with the completedTasksMutex already acquired in read mode
func (te *TaskExecutor) checkDependenciesComplete(task Task) bool {
for _, depID := range task.Dependencies() {
status, exists := te.completedTasks[depID]
if !exists || status.state != TaskStateCompleted {
return false
}
}
return true
}
// isTaskRunnable checks if a task is in a state where it can be executed
// Must be called with the completedTasksMutex already acquired in read mode
func (te *TaskExecutor) isTaskRunnable(taskID string) bool {
status, exists := te.completedTasks[taskID]
return !exists || (status.state != TaskStatePending && status.state != TaskStateRunning)
}
// checkDependenciesAndQueue evaluates all tasks and queues those that are ready to run
func (te *TaskExecutor) checkDependenciesAndQueue() {
now := time.Now()
// First pass: find all eligible tasks with a read lock
eligibleTasks := make([]*ScheduledTask, 0)
for _, st := range te.tasks {
if !te.shouldRun(st, now) {
continue
}
taskID := st.task.ID()
// Check if the task has already run once - this helps with test stability
te.runOnceMutex.RLock()
if te.runOnceFlag[taskID] && st.interval == 0 {
te.runOnceMutex.RUnlock()
continue
}
te.runOnceMutex.RUnlock()
// Check task state and dependencies
te.completedTasksMutex.RLock()
canRun := te.isTaskRunnable(taskID) && te.checkDependenciesComplete(st.task)
te.completedTasksMutex.RUnlock()
if canRun {
eligibleTasks = append(eligibleTasks, st)
}
}
// Second pass: queue eligible tasks with a write lock one by one
for _, st := range eligibleTasks {
taskID := st.task.ID()
// Get exclusive access for state update
te.completedTasksMutex.Lock()
// Check again to make sure the task is still eligible (state hasn't changed)
if !te.isTaskRunnable(taskID) {
te.completedTasksMutex.Unlock()
continue
}
// Mark as pending
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: TaskStatePending,
lastRunTime: now,
}
te.completedTasksMutex.Unlock()
// Queue the task for execution
select {
case te.readyQueue <- st:
st.lastRunTime = now
log.Printf("Task %s queued for execution", taskID)
// Mark the task as having run once for test stability
if st.interval == 0 {
te.runOnceMutex.Lock()
te.runOnceFlag[taskID] = true
te.runOnceMutex.Unlock()
}
default:
// Queue full, revert the task state
te.completedTasksMutex.Lock()
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: TaskStateReady,
}
te.completedTasksMutex.Unlock()
log.Printf("Task %s queue attempt failed, queue full", taskID)
}
}
}
// executeTask performs the actual execution of a task
func (te *TaskExecutor) executeTask(st *ScheduledTask) {
taskID := st.task.ID()
// Acquire rate limit token
te.rateLimit <- struct{}{}
defer func() {
<-te.rateLimit
}()
if err := st.task.Execute(); err != nil {
log.Printf("Task %v failed: %v", st.task, err)
// Verify task is still in pending state before executing
te.completedTasksMutex.Lock()
status, exists := te.completedTasks[taskID]
if !exists || status.state != TaskStatePending {
te.completedTasksMutex.Unlock()
log.Printf("Task %s skipped execution - state changed", taskID)
return
}
// Update state to running
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: TaskStateRunning,
lastRunTime: time.Now(),
}
te.completedTasksMutex.Unlock()
// Execute the task with a background context
ctx := context.Background()
err := st.task.Execute(ctx)
// Update final task status
te.completedTasksMutex.Lock()
finalState := TaskStateCompleted
if err != nil {
finalState = TaskStateFailed
}
te.completedTasks[taskID] = &TaskStatus{
taskID: taskID,
state: finalState,
error: err,
lastRunTime: time.Now(),
}
te.completedTasksMutex.Unlock()
// Log the result
if err != nil {
log.Printf("Task %s failed: %v", taskID, err)
} else {
log.Printf("Task %s completed successfully", taskID)
}
}

View File

@ -1,21 +1,39 @@
package task
import (
"context"
"errors"
"testing"
"time"
)
type mockTask struct {
id string
dependencies []string
executeFunc func() error
}
func (m *mockTask) Execute() error {
func (m *mockTask) ID() string {
return m.id
}
func (m *mockTask) Execute(ctx context.Context) error {
return m.executeFunc()
}
func (m *mockTask) Dependencies() []string {
return m.dependencies
}
func TestAddTask(t *testing.T) {
te := NewTaskExecutor(10)
te.AddTask(&mockTask{}, 1*time.Second)
te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error {
return nil
},
}, 1*time.Second)
te.Start()
// No need for explicit sleep now, as Start() ensures tasks are processed
if te.Len() != 1 {
t.Errorf("expected 1 task, got %d", te.Len())
}
@ -24,6 +42,7 @@ func TestExecuteTaskSuccess(t *testing.T) {
te := NewTaskExecutor(10)
executeCalled := false
te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error {
executeCalled = true
return nil
@ -40,6 +59,7 @@ func TestExecuteTaskFailure(t *testing.T) {
te := NewTaskExecutor(10)
executeCalled := false
te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error {
executeCalled = true
return expectedError
@ -56,6 +76,7 @@ func TestRateLimit(t *testing.T) {
for i := 0; i < 5; i++ {
delay := time.Duration(i) * time.Millisecond
te.AddTask(&mockTask{
id: "task" + string(rune('1'+i)),
executeFunc: func() error {
return nil
},
@ -77,13 +98,14 @@ func TestZeroInterval(t *testing.T) {
te := NewTaskExecutor(10)
executeCalled := false
te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error {
executeCalled = true
return nil
},
}, 0*time.Second)
te.Start()
time.Sleep(50 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
if !executeCalled {
t.Error("expected execute to be called, but it was not")
}
@ -95,6 +117,322 @@ func TestNoTasks(t *testing.T) {
// test passed if no panic occurred
}
func TestTaskDependencies(t *testing.T) {
te := NewTaskExecutor(10)
// Create execution tracking variables
executionOrder := make([]string, 0, 3)
executionTimes := make(map[string]time.Time, 3)
done := make(chan bool, 1) // Buffered channel to prevent goroutine leaks
// Create dependency tree: task3 depends on task2, which depends on task1
task1 := &mockTask{
id: "task1",
dependencies: []string{},
executeFunc: func() error {
executionOrder = append(executionOrder, "task1")
executionTimes["task1"] = time.Now()
return nil
},
}
task2 := &mockTask{
id: "task2",
dependencies: []string{"task1"},
executeFunc: func() error {
executionOrder = append(executionOrder, "task2")
executionTimes["task2"] = time.Now()
return nil
},
}
task3 := &mockTask{
id: "task3",
dependencies: []string{"task2"},
executeFunc: func() error {
executionOrder = append(executionOrder, "task3")
executionTimes["task3"] = time.Now()
done <- true
return nil
},
}
// Add tasks in reverse dependency order to test correct ordering
te.AddTask(task3, 0)
te.AddTask(task2, 0)
te.AddTask(task1, 0)
te.Start()
// Wait for all tasks to complete
select {
case <-done:
// All tasks completed
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for tasks to complete")
}
// Check execution order
if len(executionOrder) != 3 {
t.Fatalf("Expected 3 tasks to execute, got %d", len(executionOrder))
}
if executionOrder[0] != "task1" || executionOrder[1] != "task2" || executionOrder[2] != "task3" {
t.Errorf("Tasks executed in wrong order: %v", executionOrder)
}
// Verify timing (task1 before task2 before task3)
if !executionTimes["task1"].Before(executionTimes["task2"]) {
t.Error("task1 should execute before task2")
}
if !executionTimes["task2"].Before(executionTimes["task3"]) {
t.Error("task2 should execute before task3")
}
}
func TestComplexDependencies(t *testing.T) {
te := NewTaskExecutor(10)
// Create a more complex dependency graph
// task4 depends on task2 and task3
// task2 and task3 both depend on task1
executed := make(map[string]bool)
done := make(chan bool, 1) // Buffered channel
task1 := &mockTask{
id: "task1",
dependencies: []string{},
executeFunc: func() error {
executed["task1"] = true
return nil
},
}
task2 := &mockTask{
id: "task2",
dependencies: []string{"task1"},
executeFunc: func() error {
executed["task2"] = true
return nil
},
}
task3 := &mockTask{
id: "task3",
dependencies: []string{"task1"},
executeFunc: func() error {
executed["task3"] = true
return nil
},
}
task4 := &mockTask{
id: "task4",
dependencies: []string{"task2", "task3"},
executeFunc: func() error {
executed["task4"] = true
done <- true
return nil
},
}
// Add tasks in arbitrary order
te.AddTask(task4, 0)
te.AddTask(task2, 0)
te.AddTask(task1, 0)
te.AddTask(task3, 0)
te.Start()
// Wait for tasks to complete
select {
case <-done:
// Task4 completed
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for tasks to complete")
}
// Check all tasks executed
for _, id := range []string{"task1", "task2", "task3", "task4"} {
if !executed[id] {
t.Errorf("Task %s was not executed", id)
}
}
}
func TestFailedDependency(t *testing.T) {
te := NewTaskExecutor(10)
executed := make(map[string]bool)
done := make(chan bool, 1) // Buffered channel
// task1 fails, task2 depends on task1, so task2 should never execute
task1 := &mockTask{
id: "task1",
dependencies: []string{},
executeFunc: func() error {
executed["task1"] = true
return errors.New("task1 failed")
},
}
task2 := &mockTask{
id: "task2",
dependencies: []string{"task1"},
executeFunc: func() error {
executed["task2"] = true
done <- true
return nil
},
}
// Add a task3 that doesn't depend on anything as a signal
task3 := &mockTask{
id: "task3",
dependencies: []string{},
executeFunc: func() error {
executed["task3"] = true
done <- true
return nil
},
}
te.AddTask(task1, 0)
te.AddTask(task2, 0)
te.AddTask(task3, 0)
te.Start()
// Wait for task3 to complete
select {
case <-done:
// Task3 completed
case <-time.After(1 * time.Second):
t.Fatal("Timed out waiting for task3 to complete")
}
// Give a little extra time for any other tasks that might execute
time.Sleep(200 * time.Millisecond)
// Check that task1 executed and failed
if !executed["task1"] {
t.Error("task1 should have executed")
}
// Check that task2 did not execute due to failed dependency
if executed["task2"] {
t.Error("task2 should not have executed because task1 failed")
}
// Check that task3 executed (independent task)
if !executed["task3"] {
t.Error("task3 should have executed")
}
}
func TestCircularDependencies(t *testing.T) {
te := NewTaskExecutor(10)
// Create a circular dependency: task1 -> task2 -> task3 -> task1
task1 := &mockTask{
id: "task1",
dependencies: []string{"task3"},
executeFunc: func() error {
return nil
},
}
task2 := &mockTask{
id: "task2",
dependencies: []string{"task1"},
executeFunc: func() error {
return nil
},
}
task3 := &mockTask{
id: "task3",
dependencies: []string{"task2"},
executeFunc: func() error {
return nil
},
}
// The first two tasks should be added successfully
te.AddTask(task1, 0)
te.AddTask(task2, 0)
// This should be rejected due to circular dependency detection
te.AddTask(task3, 0)
// Start the executor
te.Start()
// Wait a bit to ensure tasks don't execute
time.Sleep(500 * time.Millisecond)
// There should still be only 2 tasks (task3 should have been rejected)
if te.Len() != 2 {
t.Errorf("Expected 2 tasks, got %d", te.Len())
}
}
func TestNonExistentDependency(t *testing.T) {
te := NewTaskExecutor(10)
// Create a task that depends on a non-existent task
executed := make(map[string]bool)
done := make(chan bool, 1)
task1 := &mockTask{
id: "task1",
dependencies: []string{"non-existent-task"},
executeFunc: func() error {
executed["task1"] = true
done <- true
return nil
},
}
// Add a control task to signal completion
task2 := &mockTask{
id: "task2",
dependencies: []string{},
executeFunc: func() error {
executed["task2"] = true
done <- true
return nil
},
}
// Both tasks should be added, but task1 won't run due to missing dependency
te.AddTask(task1, 0)
te.AddTask(task2, 0)
te.Start()
// Wait for the signal task to complete
select {
case <-done:
// Task2 completed
case <-time.After(1 * time.Second):
t.Fatal("Timed out waiting for tasks to complete")
}
// Give extra time for any other tasks
time.Sleep(200 * time.Millisecond)
// Task2 should have executed
if !executed["task2"] {
t.Error("task2 should have executed")
}
// Task1 should not have executed due to missing dependency
if executed["task1"] {
t.Error("task1 should not have executed due to missing dependency")
}
}
var ErrorTask = errors.New("task failed")
var executeTaskTestCases = []struct {
name string
@ -119,6 +457,7 @@ func TestExecuteTask(t *testing.T) {
te := NewTaskExecutor(10)
executeCalled := false
te.AddTask(&mockTask{
id: "task1",
executeFunc: func() error {
executeCalled = true
return tc.executeError