Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
990c12b3c1 | |||
8c0f9deaf1 | |||
07c7e3163f | |||
43ad445ef5 | |||
c51c4cb645 | |||
ff2dc9f7da | |||
b96006064e |
51
.gitea/workflows/ci.yml
Normal file
51
.gitea/workflows/ci.yml
Normal file
@ -0,0 +1,51 @@
|
||||
name: Go Tests
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: Run Tests
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
go-version: [ '1.24.2' ]
|
||||
steps:
|
||||
- name: Check out code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Go ${{ matrix.go-version }}
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: ${{ matrix.go-version }}
|
||||
check-latest: true
|
||||
|
||||
- name: Verify dependencies
|
||||
run: go mod verify
|
||||
|
||||
- name: Run go vet
|
||||
run: go vet ./...
|
||||
|
||||
- name: Run tests
|
||||
run: go test -v ./...
|
||||
|
||||
- name: Send success notification
|
||||
if: success()
|
||||
run: |
|
||||
curl -X POST \
|
||||
-H "Content-Type: text/plain" \
|
||||
-d "✅ <b>task</b> success! View run at: https://git.canoozie.net/${{ gitea.repository }}/actions/runs/${{ gitea.run_number }}" \
|
||||
https://chat.canoozie.net/rooms/5/2-q6gKxqrTAfhd/messages
|
||||
|
||||
- name: Send failure notification
|
||||
if: failure()
|
||||
run: |
|
||||
curl -X POST \
|
||||
-H "Content-Type: text/plain" \
|
||||
-d "❌ <b>task</b> failure! View run at: https://git.canoozie.net/${{ gitea.repository }}/actions/runs/${{ gitea.run_number }}" \
|
||||
https://chat.canoozie.net/rooms/5/2-q6gKxqrTAfhd/messages
|
205
README.md
Normal file
205
README.md
Normal file
@ -0,0 +1,205 @@
|
||||
# Task
|
||||
|
||||
A simple and efficient scheduled task execution library for Go applications.
|
||||
|
||||
## Overview
|
||||
|
||||
The Task library provides a lightweight mechanism for scheduling and executing tasks with configurable intervals and rate limiting. It's particularly useful for applications that need to run background jobs, periodic maintenance tasks, or any recurring operations.
|
||||
|
||||
## Features
|
||||
|
||||
- Schedule tasks to run at regular intervals
|
||||
- Rate limiting to control concurrent task execution
|
||||
- Run tasks immediately (zero interval)
|
||||
- Simple interface-based design for easy integration
|
||||
- Task dependencies - tasks only run after their dependencies complete successfully
|
||||
|
||||
## Installation
|
||||
|
||||
```
|
||||
go get git.canoozie.net/jer/task
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic Example
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.canoozie.net/jer/task"
|
||||
)
|
||||
|
||||
// Define a task by implementing the Task interface
|
||||
type MyTask struct {
|
||||
TaskID string
|
||||
Name string
|
||||
Dependencies []string
|
||||
}
|
||||
|
||||
func (t *MyTask) ID() string {
|
||||
return t.TaskID
|
||||
}
|
||||
|
||||
func (t *MyTask) Execute(ctx context.Context) error {
|
||||
fmt.Printf("Executing task: %s\n", t.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *MyTask) Dependencies() []string {
|
||||
return t.Dependencies
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Create a task executor with a rate limit of 2 concurrent tasks
|
||||
executor := task.NewTaskExecutor(2)
|
||||
|
||||
// Create tasks with dependencies
|
||||
task1 := &MyTask{
|
||||
TaskID: "task1",
|
||||
Name: "Database Initialization",
|
||||
Dependencies: []string{},
|
||||
}
|
||||
|
||||
task2 := &MyTask{
|
||||
TaskID: "task2",
|
||||
Name: "Data Processing",
|
||||
Dependencies: []string{"task1"}, // Depends on task1
|
||||
}
|
||||
|
||||
task3 := &MyTask{
|
||||
TaskID: "task3",
|
||||
Name: "Report Generation",
|
||||
Dependencies: []string{"task2"}, // Depends on task2
|
||||
}
|
||||
|
||||
// Add tasks - order doesn't matter, dependencies control execution order
|
||||
executor.AddTask(task3, 0) // Will only run after task2 completes
|
||||
executor.AddTask(task2, 0) // Will only run after task1 completes
|
||||
executor.AddTask(task1, 0) // Will run immediately
|
||||
|
||||
// Start the task executor
|
||||
executor.Start()
|
||||
|
||||
// Keep the program running
|
||||
select {}
|
||||
}
|
||||
```
|
||||
|
||||
### Rate Limiting
|
||||
|
||||
The task executor includes built-in rate limiting to control how many tasks can run concurrently. This is useful for resource-intensive tasks or when you need to limit API calls.
|
||||
|
||||
```go
|
||||
// Create an executor that allows up to 5 concurrent tasks
|
||||
executor := task.NewTaskExecutor(5)
|
||||
```
|
||||
|
||||
### Task Dependencies
|
||||
|
||||
Tasks can specify other tasks as dependencies, ensuring they only run after all their dependencies have completed successfully:
|
||||
|
||||
```go
|
||||
// Task2 depends on Task1 - it won't run until Task1 completes successfully
|
||||
task1 := &MyTask{
|
||||
TaskID: "database-init",
|
||||
Dependencies: []string{},
|
||||
}
|
||||
|
||||
task2 := &MyTask{
|
||||
TaskID: "data-processing",
|
||||
Dependencies: []string{"database-init"},
|
||||
}
|
||||
|
||||
// If Task1 fails, Task2 will not run
|
||||
```
|
||||
|
||||
Key aspects of the dependency system:
|
||||
|
||||
- Dependencies are specified by task ID
|
||||
- A task will only run when all its dependencies have completed successfully
|
||||
- If a dependency fails, dependent tasks will not run
|
||||
- Circular dependencies are automatically detected and rejected
|
||||
- Self-dependencies are automatically detected and rejected
|
||||
- Missing dependencies are silently allowed (developer's responsibility)
|
||||
- Adding tasks in dependency order is not required - the system resolves the correct execution order
|
||||
|
||||
### Task Interface
|
||||
|
||||
To create a task, implement the `Task` interface:
|
||||
|
||||
```go
|
||||
type Task interface {
|
||||
ID() string
|
||||
Execute(ctx context.Context) error
|
||||
Dependencies() []string
|
||||
}
|
||||
```
|
||||
|
||||
- `ID()` - Returns a unique identifier for this task
|
||||
- `Execute(ctx context.Context)` - Performs the task's operation with context, returning any errors
|
||||
- `Dependencies()` - Returns a list of task IDs that must complete successfully before this task can run
|
||||
|
||||
Any errors returned from `Execute()` will be logged and will prevent dependent tasks from running.
|
||||
|
||||
## API Reference
|
||||
|
||||
### Types
|
||||
|
||||
#### `Task`
|
||||
```go
|
||||
type Task interface {
|
||||
ID() string
|
||||
Execute(ctx context.Context) error
|
||||
Dependencies() []string
|
||||
}
|
||||
```
|
||||
|
||||
### Functions
|
||||
|
||||
#### `NewTaskExecutor`
|
||||
```go
|
||||
func NewTaskExecutor(rateLimit int) *TaskExecutor
|
||||
```
|
||||
Creates a new task executor with the specified concurrency limit.
|
||||
|
||||
### Methods
|
||||
|
||||
#### `AddTask`
|
||||
```go
|
||||
func (te *TaskExecutor) AddTask(task Task, interval time.Duration)
|
||||
```
|
||||
Schedules a task to run at the specified interval. If interval is 0, the task is executed immediately. Negative intervals are ignored.
|
||||
|
||||
#### `Start`
|
||||
```go
|
||||
func (te *TaskExecutor) Start()
|
||||
```
|
||||
Starts the task executor, which will begin processing scheduled tasks.
|
||||
|
||||
#### `Len`
|
||||
```go
|
||||
func (te *TaskExecutor) Len() int
|
||||
```
|
||||
Returns the number of tasks managed by the executor.
|
||||
|
||||
## License
|
||||
|
||||
Copyright (C) 2025 Jeremy Tregunna
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
2
go.mod
2
go.mod
@ -1,3 +1,3 @@
|
||||
module git.canoozie.net/jer/task.git
|
||||
module git.canoozie.net/jer/task
|
||||
|
||||
go 1.23.2
|
||||
|
330
task.go
330
task.go
@ -1,95 +1,375 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Task interface {
|
||||
Execute() error
|
||||
ID() string
|
||||
Execute(ctx context.Context) error
|
||||
Dependencies() []string
|
||||
}
|
||||
|
||||
type TaskState int
|
||||
|
||||
const (
|
||||
TaskStateReady TaskState = iota
|
||||
TaskStatePending
|
||||
TaskStateRunning
|
||||
TaskStateCompleted
|
||||
TaskStateFailed
|
||||
)
|
||||
|
||||
// TaskStatus tracks the current state and execution history of a task
|
||||
type TaskStatus struct {
|
||||
taskID string
|
||||
state TaskState
|
||||
lastRunTime time.Time
|
||||
error error
|
||||
}
|
||||
|
||||
// ScheduledTask represents a task with its execution interval
|
||||
type ScheduledTask struct {
|
||||
task Task
|
||||
interval time.Duration
|
||||
lastRunTime time.Time
|
||||
}
|
||||
|
||||
// TaskExecutor manages and executes scheduled tasks with dependencies
|
||||
type TaskExecutor struct {
|
||||
tasks []*ScheduledTask
|
||||
rateLimit chan struct{}
|
||||
taskChan chan *ScheduledTask
|
||||
tasks []*ScheduledTask // All registered tasks
|
||||
completedTasks map[string]*TaskStatus // Status tracking for all tasks
|
||||
completedTasksMutex sync.RWMutex // For thread-safe status access
|
||||
taskRegister map[string]*ScheduledTask // Quick lookup of tasks by ID
|
||||
rateLimit chan struct{} // Semaphore for concurrent execution limit
|
||||
taskChan chan *ScheduledTask // Channel for registering new tasks
|
||||
readyQueue chan *ScheduledTask // Channel for tasks ready to execute
|
||||
runOnceFlag map[string]bool // Flag to enforce single execution for tests
|
||||
runOnceMutex sync.RWMutex // Mutex for runOnceFlag
|
||||
}
|
||||
|
||||
// NewTaskExecutor creates a new task executor with the specified rate limit
|
||||
func NewTaskExecutor(rateLimit int) *TaskExecutor {
|
||||
return &TaskExecutor{
|
||||
tasks: make([]*ScheduledTask, 0),
|
||||
completedTasks: make(map[string]*TaskStatus),
|
||||
taskRegister: make(map[string]*ScheduledTask),
|
||||
rateLimit: make(chan struct{}, rateLimit),
|
||||
taskChan: make(chan *ScheduledTask, 100),
|
||||
readyQueue: make(chan *ScheduledTask, 100),
|
||||
runOnceFlag: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
// Len returns the number of registered tasks
|
||||
func (te *TaskExecutor) Len() int {
|
||||
return len(te.tasks)
|
||||
}
|
||||
|
||||
// AddTask adds a new task to the execution queue
|
||||
func (te *TaskExecutor) AddTask(task Task, interval time.Duration) {
|
||||
log.Printf("Adding task %T with interval %v\n", task, interval)
|
||||
|
||||
if interval < 0 {
|
||||
log.Printf("Task %s has a negative interval, ignoring", task.ID())
|
||||
return
|
||||
}
|
||||
|
||||
st := &ScheduledTask{
|
||||
task: task,
|
||||
interval: interval,
|
||||
lastRunTime: time.Now(),
|
||||
}
|
||||
|
||||
// Register the task immediately in the task register
|
||||
taskID := task.ID()
|
||||
|
||||
// Check for duplicate task ID
|
||||
te.completedTasksMutex.Lock()
|
||||
if _, exists := te.taskRegister[taskID]; exists {
|
||||
log.Printf("Warning: Task with ID %s already exists, overwriting", taskID)
|
||||
}
|
||||
|
||||
// Validate dependencies
|
||||
missingDeps := []string{}
|
||||
for _, depID := range task.Dependencies() {
|
||||
// Check for self-dependency
|
||||
if depID == taskID {
|
||||
te.completedTasksMutex.Unlock()
|
||||
log.Printf("Error: Task %s depends on itself, ignoring", taskID)
|
||||
return
|
||||
}
|
||||
|
||||
// Check that the dependency exists in the system
|
||||
if _, exists := te.taskRegister[depID]; !exists {
|
||||
missingDeps = append(missingDeps, depID)
|
||||
}
|
||||
}
|
||||
|
||||
// Dependencies aren't required, so we silently allow missing dependencies
|
||||
|
||||
// Check for circular dependencies (basic detection)
|
||||
visited := make(map[string]bool)
|
||||
if te.hasCircularDependency(taskID, task.Dependencies(), visited) {
|
||||
te.completedTasksMutex.Unlock()
|
||||
log.Printf("Error: Task %s has circular dependencies, ignoring", taskID)
|
||||
return
|
||||
}
|
||||
|
||||
te.taskRegister[taskID] = st
|
||||
|
||||
// Initialize the task status if it doesn't exist
|
||||
if _, exists := te.completedTasks[taskID]; !exists {
|
||||
te.completedTasks[taskID] = &TaskStatus{
|
||||
taskID: taskID,
|
||||
state: TaskStateReady,
|
||||
}
|
||||
}
|
||||
te.completedTasksMutex.Unlock()
|
||||
|
||||
// Queue the task for processing
|
||||
select {
|
||||
case te.taskChan <- st:
|
||||
log.Printf("Task %T queued up with interval %v\n", task, interval)
|
||||
default:
|
||||
log.Printf("Failed to add task $T with interval %v, channel full\n", task, interval)
|
||||
log.Printf("Failed to add task %T with interval %v, channel full\n", task, interval)
|
||||
}
|
||||
}
|
||||
|
||||
// Start initiates task processing and scheduling
|
||||
func (te *TaskExecutor) Start() {
|
||||
// Launch the task processor goroutine to handle registration
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case st := <-te.taskChan:
|
||||
te.tasks = append(te.tasks, st)
|
||||
|
||||
if st.interval == 0 {
|
||||
log.Printf("Task %T has an interval of 0, running now\n", st.task)
|
||||
go func(st *ScheduledTask) {
|
||||
te.executeTask(st)
|
||||
}(st)
|
||||
log.Printf("Task %s has an interval of 0, queuing for immediate execution\n", st.task.ID())
|
||||
st.lastRunTime = time.Now().Add(-24 * time.Hour) // Ensure it's ready to run
|
||||
} else if st.interval < 0 {
|
||||
log.Printf("Task %T has a negative interval, nonsensical, ignoring", st.task)
|
||||
} else {
|
||||
go func(st *ScheduledTask) {
|
||||
ticker := time.NewTicker(st.interval)
|
||||
defer ticker.Stop()
|
||||
for t := range ticker.C {
|
||||
if !te.shouldRun(st, t) {
|
||||
continue
|
||||
}
|
||||
st.lastRunTime = t
|
||||
log.Printf("Task %T last run time: %v, running now\n", st.task, st.lastRunTime)
|
||||
go te.executeTask(st)
|
||||
}
|
||||
}(st)
|
||||
log.Printf("Task %s has a negative interval, nonsensical, ignoring", st.task.ID())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Launch dependency checker goroutine
|
||||
go func() {
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
te.checkDependenciesAndQueue()
|
||||
}
|
||||
}()
|
||||
|
||||
// Launch task executor goroutine - using a single goroutine here
|
||||
// helps ensure more predictable execution order for dependencies
|
||||
go func() {
|
||||
for st := range te.readyQueue {
|
||||
te.executeTask(st)
|
||||
}
|
||||
}()
|
||||
|
||||
// Process any tasks already in the channel before returning
|
||||
// This ensures that when Start() returns, all queued tasks are in the tasks slice
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
// shouldRun determines if a task is ready to run based on its interval
|
||||
func (te *TaskExecutor) shouldRun(st *ScheduledTask, t time.Time) bool {
|
||||
return t.Sub(st.lastRunTime) >= st.interval
|
||||
}
|
||||
|
||||
// hasCircularDependency checks if a task has circular dependencies
|
||||
// The completedTasksMutex must be held when calling this function
|
||||
func (te *TaskExecutor) hasCircularDependency(taskID string, dependencies []string, visited map[string]bool) bool {
|
||||
// If we've already visited this task in current path, we have a cycle
|
||||
if visited[taskID] {
|
||||
return true
|
||||
}
|
||||
|
||||
// Mark this task as visited
|
||||
visited[taskID] = true
|
||||
|
||||
// Check each dependency
|
||||
for _, depID := range dependencies {
|
||||
// Skip if already detected cycle
|
||||
if visited[depID] {
|
||||
return true
|
||||
}
|
||||
|
||||
// Get the dependency task
|
||||
depTask, exists := te.taskRegister[depID]
|
||||
if !exists {
|
||||
// Dependency doesn't exist yet, can't determine circular status
|
||||
continue
|
||||
}
|
||||
|
||||
// Recursively check the dependency's dependencies
|
||||
if te.hasCircularDependency(depID, depTask.task.Dependencies(), visited) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Remove this task from visited (backtrack)
|
||||
visited[taskID] = false
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// checkDependenciesComplete verifies if all dependencies for a task are completed successfully
|
||||
// Must be called with the completedTasksMutex already acquired in read mode
|
||||
func (te *TaskExecutor) checkDependenciesComplete(task Task) bool {
|
||||
for _, depID := range task.Dependencies() {
|
||||
status, exists := te.completedTasks[depID]
|
||||
if !exists || status.state != TaskStateCompleted {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// isTaskRunnable checks if a task is in a state where it can be executed
|
||||
// Must be called with the completedTasksMutex already acquired in read mode
|
||||
func (te *TaskExecutor) isTaskRunnable(taskID string) bool {
|
||||
status, exists := te.completedTasks[taskID]
|
||||
return !exists || (status.state != TaskStatePending && status.state != TaskStateRunning)
|
||||
}
|
||||
|
||||
// checkDependenciesAndQueue evaluates all tasks and queues those that are ready to run
|
||||
func (te *TaskExecutor) checkDependenciesAndQueue() {
|
||||
now := time.Now()
|
||||
|
||||
// First pass: find all eligible tasks with a read lock
|
||||
eligibleTasks := make([]*ScheduledTask, 0)
|
||||
|
||||
for _, st := range te.tasks {
|
||||
if !te.shouldRun(st, now) {
|
||||
continue
|
||||
}
|
||||
|
||||
taskID := st.task.ID()
|
||||
|
||||
// Check if the task has already run once - this helps with test stability
|
||||
te.runOnceMutex.RLock()
|
||||
if te.runOnceFlag[taskID] && st.interval == 0 {
|
||||
te.runOnceMutex.RUnlock()
|
||||
continue
|
||||
}
|
||||
te.runOnceMutex.RUnlock()
|
||||
|
||||
// Check task state and dependencies
|
||||
te.completedTasksMutex.RLock()
|
||||
canRun := te.isTaskRunnable(taskID) && te.checkDependenciesComplete(st.task)
|
||||
te.completedTasksMutex.RUnlock()
|
||||
|
||||
if canRun {
|
||||
eligibleTasks = append(eligibleTasks, st)
|
||||
}
|
||||
}
|
||||
|
||||
// Second pass: queue eligible tasks with a write lock one by one
|
||||
for _, st := range eligibleTasks {
|
||||
taskID := st.task.ID()
|
||||
|
||||
// Get exclusive access for state update
|
||||
te.completedTasksMutex.Lock()
|
||||
|
||||
// Check again to make sure the task is still eligible (state hasn't changed)
|
||||
if !te.isTaskRunnable(taskID) {
|
||||
te.completedTasksMutex.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// Mark as pending
|
||||
te.completedTasks[taskID] = &TaskStatus{
|
||||
taskID: taskID,
|
||||
state: TaskStatePending,
|
||||
lastRunTime: now,
|
||||
}
|
||||
te.completedTasksMutex.Unlock()
|
||||
|
||||
// Queue the task for execution
|
||||
select {
|
||||
case te.readyQueue <- st:
|
||||
st.lastRunTime = now
|
||||
log.Printf("Task %s queued for execution", taskID)
|
||||
|
||||
// Mark the task as having run once for test stability
|
||||
if st.interval == 0 {
|
||||
te.runOnceMutex.Lock()
|
||||
te.runOnceFlag[taskID] = true
|
||||
te.runOnceMutex.Unlock()
|
||||
}
|
||||
default:
|
||||
// Queue full, revert the task state
|
||||
te.completedTasksMutex.Lock()
|
||||
te.completedTasks[taskID] = &TaskStatus{
|
||||
taskID: taskID,
|
||||
state: TaskStateReady,
|
||||
}
|
||||
te.completedTasksMutex.Unlock()
|
||||
log.Printf("Task %s queue attempt failed, queue full", taskID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// executeTask performs the actual execution of a task
|
||||
func (te *TaskExecutor) executeTask(st *ScheduledTask) {
|
||||
taskID := st.task.ID()
|
||||
|
||||
// Acquire rate limit token
|
||||
te.rateLimit <- struct{}{}
|
||||
defer func() {
|
||||
<-te.rateLimit
|
||||
}()
|
||||
if err := st.task.Execute(); err != nil {
|
||||
log.Printf("Task %v failed: %v", st.task, err)
|
||||
|
||||
// Verify task is still in pending state before executing
|
||||
te.completedTasksMutex.Lock()
|
||||
status, exists := te.completedTasks[taskID]
|
||||
|
||||
if !exists || status.state != TaskStatePending {
|
||||
te.completedTasksMutex.Unlock()
|
||||
log.Printf("Task %s skipped execution - state changed", taskID)
|
||||
return
|
||||
}
|
||||
|
||||
// Update state to running
|
||||
te.completedTasks[taskID] = &TaskStatus{
|
||||
taskID: taskID,
|
||||
state: TaskStateRunning,
|
||||
lastRunTime: time.Now(),
|
||||
}
|
||||
te.completedTasksMutex.Unlock()
|
||||
|
||||
// Execute the task with a background context
|
||||
ctx := context.Background()
|
||||
err := st.task.Execute(ctx)
|
||||
|
||||
// Update final task status
|
||||
te.completedTasksMutex.Lock()
|
||||
finalState := TaskStateCompleted
|
||||
if err != nil {
|
||||
finalState = TaskStateFailed
|
||||
}
|
||||
|
||||
te.completedTasks[taskID] = &TaskStatus{
|
||||
taskID: taskID,
|
||||
state: finalState,
|
||||
error: err,
|
||||
lastRunTime: time.Now(),
|
||||
}
|
||||
te.completedTasksMutex.Unlock()
|
||||
|
||||
// Log the result
|
||||
if err != nil {
|
||||
log.Printf("Task %s failed: %v", taskID, err)
|
||||
} else {
|
||||
log.Printf("Task %s completed successfully", taskID)
|
||||
}
|
||||
}
|
345
task_test.go
345
task_test.go
@ -1,21 +1,39 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type mockTask struct {
|
||||
id string
|
||||
dependencies []string
|
||||
executeFunc func() error
|
||||
}
|
||||
|
||||
func (m *mockTask) Execute() error {
|
||||
func (m *mockTask) ID() string {
|
||||
return m.id
|
||||
}
|
||||
|
||||
func (m *mockTask) Execute(ctx context.Context) error {
|
||||
return m.executeFunc()
|
||||
}
|
||||
|
||||
func (m *mockTask) Dependencies() []string {
|
||||
return m.dependencies
|
||||
}
|
||||
func TestAddTask(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
te.AddTask(&mockTask{}, 1*time.Second)
|
||||
te.AddTask(&mockTask{
|
||||
id: "task1",
|
||||
executeFunc: func() error {
|
||||
return nil
|
||||
},
|
||||
}, 1*time.Second)
|
||||
te.Start()
|
||||
// No need for explicit sleep now, as Start() ensures tasks are processed
|
||||
if te.Len() != 1 {
|
||||
t.Errorf("expected 1 task, got %d", te.Len())
|
||||
}
|
||||
@ -24,6 +42,7 @@ func TestExecuteTaskSuccess(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
executeCalled := false
|
||||
te.AddTask(&mockTask{
|
||||
id: "task1",
|
||||
executeFunc: func() error {
|
||||
executeCalled = true
|
||||
return nil
|
||||
@ -40,6 +59,7 @@ func TestExecuteTaskFailure(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
executeCalled := false
|
||||
te.AddTask(&mockTask{
|
||||
id: "task1",
|
||||
executeFunc: func() error {
|
||||
executeCalled = true
|
||||
return expectedError
|
||||
@ -56,6 +76,7 @@ func TestRateLimit(t *testing.T) {
|
||||
for i := 0; i < 5; i++ {
|
||||
delay := time.Duration(i) * time.Millisecond
|
||||
te.AddTask(&mockTask{
|
||||
id: "task" + string(rune('1'+i)),
|
||||
executeFunc: func() error {
|
||||
return nil
|
||||
},
|
||||
@ -77,13 +98,14 @@ func TestZeroInterval(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
executeCalled := false
|
||||
te.AddTask(&mockTask{
|
||||
id: "task1",
|
||||
executeFunc: func() error {
|
||||
executeCalled = true
|
||||
return nil
|
||||
},
|
||||
}, 0*time.Second)
|
||||
te.Start()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if !executeCalled {
|
||||
t.Error("expected execute to be called, but it was not")
|
||||
}
|
||||
@ -95,6 +117,322 @@ func TestNoTasks(t *testing.T) {
|
||||
// test passed if no panic occurred
|
||||
}
|
||||
|
||||
func TestTaskDependencies(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
|
||||
// Create execution tracking variables
|
||||
executionOrder := make([]string, 0, 3)
|
||||
executionTimes := make(map[string]time.Time, 3)
|
||||
done := make(chan bool, 1) // Buffered channel to prevent goroutine leaks
|
||||
|
||||
// Create dependency tree: task3 depends on task2, which depends on task1
|
||||
task1 := &mockTask{
|
||||
id: "task1",
|
||||
dependencies: []string{},
|
||||
executeFunc: func() error {
|
||||
executionOrder = append(executionOrder, "task1")
|
||||
executionTimes["task1"] = time.Now()
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
task2 := &mockTask{
|
||||
id: "task2",
|
||||
dependencies: []string{"task1"},
|
||||
executeFunc: func() error {
|
||||
executionOrder = append(executionOrder, "task2")
|
||||
executionTimes["task2"] = time.Now()
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
task3 := &mockTask{
|
||||
id: "task3",
|
||||
dependencies: []string{"task2"},
|
||||
executeFunc: func() error {
|
||||
executionOrder = append(executionOrder, "task3")
|
||||
executionTimes["task3"] = time.Now()
|
||||
done <- true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
// Add tasks in reverse dependency order to test correct ordering
|
||||
te.AddTask(task3, 0)
|
||||
te.AddTask(task2, 0)
|
||||
te.AddTask(task1, 0)
|
||||
|
||||
te.Start()
|
||||
|
||||
// Wait for all tasks to complete
|
||||
select {
|
||||
case <-done:
|
||||
// All tasks completed
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Timed out waiting for tasks to complete")
|
||||
}
|
||||
|
||||
// Check execution order
|
||||
if len(executionOrder) != 3 {
|
||||
t.Fatalf("Expected 3 tasks to execute, got %d", len(executionOrder))
|
||||
}
|
||||
|
||||
if executionOrder[0] != "task1" || executionOrder[1] != "task2" || executionOrder[2] != "task3" {
|
||||
t.Errorf("Tasks executed in wrong order: %v", executionOrder)
|
||||
}
|
||||
|
||||
// Verify timing (task1 before task2 before task3)
|
||||
if !executionTimes["task1"].Before(executionTimes["task2"]) {
|
||||
t.Error("task1 should execute before task2")
|
||||
}
|
||||
|
||||
if !executionTimes["task2"].Before(executionTimes["task3"]) {
|
||||
t.Error("task2 should execute before task3")
|
||||
}
|
||||
}
|
||||
|
||||
func TestComplexDependencies(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
|
||||
// Create a more complex dependency graph
|
||||
// task4 depends on task2 and task3
|
||||
// task2 and task3 both depend on task1
|
||||
|
||||
executed := make(map[string]bool)
|
||||
done := make(chan bool, 1) // Buffered channel
|
||||
|
||||
|
||||
task1 := &mockTask{
|
||||
id: "task1",
|
||||
dependencies: []string{},
|
||||
executeFunc: func() error {
|
||||
executed["task1"] = true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
task2 := &mockTask{
|
||||
id: "task2",
|
||||
dependencies: []string{"task1"},
|
||||
executeFunc: func() error {
|
||||
executed["task2"] = true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
task3 := &mockTask{
|
||||
id: "task3",
|
||||
dependencies: []string{"task1"},
|
||||
executeFunc: func() error {
|
||||
executed["task3"] = true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
task4 := &mockTask{
|
||||
id: "task4",
|
||||
dependencies: []string{"task2", "task3"},
|
||||
executeFunc: func() error {
|
||||
executed["task4"] = true
|
||||
done <- true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
// Add tasks in arbitrary order
|
||||
te.AddTask(task4, 0)
|
||||
te.AddTask(task2, 0)
|
||||
te.AddTask(task1, 0)
|
||||
te.AddTask(task3, 0)
|
||||
|
||||
te.Start()
|
||||
|
||||
// Wait for tasks to complete
|
||||
select {
|
||||
case <-done:
|
||||
// Task4 completed
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Timed out waiting for tasks to complete")
|
||||
}
|
||||
|
||||
// Check all tasks executed
|
||||
for _, id := range []string{"task1", "task2", "task3", "task4"} {
|
||||
if !executed[id] {
|
||||
t.Errorf("Task %s was not executed", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFailedDependency(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
|
||||
executed := make(map[string]bool)
|
||||
done := make(chan bool, 1) // Buffered channel
|
||||
|
||||
// task1 fails, task2 depends on task1, so task2 should never execute
|
||||
task1 := &mockTask{
|
||||
id: "task1",
|
||||
dependencies: []string{},
|
||||
executeFunc: func() error {
|
||||
executed["task1"] = true
|
||||
return errors.New("task1 failed")
|
||||
},
|
||||
}
|
||||
|
||||
task2 := &mockTask{
|
||||
id: "task2",
|
||||
dependencies: []string{"task1"},
|
||||
executeFunc: func() error {
|
||||
executed["task2"] = true
|
||||
done <- true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
// Add a task3 that doesn't depend on anything as a signal
|
||||
task3 := &mockTask{
|
||||
id: "task3",
|
||||
dependencies: []string{},
|
||||
executeFunc: func() error {
|
||||
executed["task3"] = true
|
||||
done <- true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
te.AddTask(task1, 0)
|
||||
te.AddTask(task2, 0)
|
||||
te.AddTask(task3, 0)
|
||||
|
||||
te.Start()
|
||||
|
||||
// Wait for task3 to complete
|
||||
select {
|
||||
case <-done:
|
||||
// Task3 completed
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("Timed out waiting for task3 to complete")
|
||||
}
|
||||
|
||||
// Give a little extra time for any other tasks that might execute
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Check that task1 executed and failed
|
||||
if !executed["task1"] {
|
||||
t.Error("task1 should have executed")
|
||||
}
|
||||
|
||||
// Check that task2 did not execute due to failed dependency
|
||||
if executed["task2"] {
|
||||
t.Error("task2 should not have executed because task1 failed")
|
||||
}
|
||||
|
||||
// Check that task3 executed (independent task)
|
||||
if !executed["task3"] {
|
||||
t.Error("task3 should have executed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCircularDependencies(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
|
||||
// Create a circular dependency: task1 -> task2 -> task3 -> task1
|
||||
task1 := &mockTask{
|
||||
id: "task1",
|
||||
dependencies: []string{"task3"},
|
||||
executeFunc: func() error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
task2 := &mockTask{
|
||||
id: "task2",
|
||||
dependencies: []string{"task1"},
|
||||
executeFunc: func() error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
task3 := &mockTask{
|
||||
id: "task3",
|
||||
dependencies: []string{"task2"},
|
||||
executeFunc: func() error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
// The first two tasks should be added successfully
|
||||
te.AddTask(task1, 0)
|
||||
te.AddTask(task2, 0)
|
||||
|
||||
// This should be rejected due to circular dependency detection
|
||||
te.AddTask(task3, 0)
|
||||
|
||||
// Start the executor
|
||||
te.Start()
|
||||
|
||||
// Wait a bit to ensure tasks don't execute
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// There should still be only 2 tasks (task3 should have been rejected)
|
||||
if te.Len() != 2 {
|
||||
t.Errorf("Expected 2 tasks, got %d", te.Len())
|
||||
}
|
||||
}
|
||||
|
||||
func TestNonExistentDependency(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
|
||||
// Create a task that depends on a non-existent task
|
||||
executed := make(map[string]bool)
|
||||
done := make(chan bool, 1)
|
||||
|
||||
task1 := &mockTask{
|
||||
id: "task1",
|
||||
dependencies: []string{"non-existent-task"},
|
||||
executeFunc: func() error {
|
||||
executed["task1"] = true
|
||||
done <- true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
// Add a control task to signal completion
|
||||
task2 := &mockTask{
|
||||
id: "task2",
|
||||
dependencies: []string{},
|
||||
executeFunc: func() error {
|
||||
executed["task2"] = true
|
||||
done <- true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
// Both tasks should be added, but task1 won't run due to missing dependency
|
||||
te.AddTask(task1, 0)
|
||||
te.AddTask(task2, 0)
|
||||
|
||||
te.Start()
|
||||
|
||||
// Wait for the signal task to complete
|
||||
select {
|
||||
case <-done:
|
||||
// Task2 completed
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("Timed out waiting for tasks to complete")
|
||||
}
|
||||
|
||||
// Give extra time for any other tasks
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Task2 should have executed
|
||||
if !executed["task2"] {
|
||||
t.Error("task2 should have executed")
|
||||
}
|
||||
|
||||
// Task1 should not have executed due to missing dependency
|
||||
if executed["task1"] {
|
||||
t.Error("task1 should not have executed due to missing dependency")
|
||||
}
|
||||
}
|
||||
|
||||
var ErrorTask = errors.New("task failed")
|
||||
var executeTaskTestCases = []struct {
|
||||
name string
|
||||
@ -119,6 +457,7 @@ func TestExecuteTask(t *testing.T) {
|
||||
te := NewTaskExecutor(10)
|
||||
executeCalled := false
|
||||
te.AddTask(&mockTask{
|
||||
id: "task1",
|
||||
executeFunc: func() error {
|
||||
executeCalled = true
|
||||
return tc.executeError
|
||||
|
Loading…
Reference in New Issue
Block a user