feat: Initial task execution system

This commit is contained in:
Jeremy Tregunna 2024-12-26 01:03:17 -06:00
commit 2869941b0c
No known key found for this signature in database
GPG Key ID: 1278B36BA6F5D5E4
3 changed files with 235 additions and 0 deletions

3
go.mod Normal file
View File

@ -0,0 +1,3 @@
module git.canoozie.net/jer/task
go 1.23.2

95
task.go Normal file
View File

@ -0,0 +1,95 @@
package task
import (
"log"
"time"
)
type Task interface {
Execute() error
}
type ScheduledTask struct {
task Task
interval time.Duration
lastRunTime time.Time
}
type TaskExecutor struct {
tasks []*ScheduledTask
rateLimit chan struct{}
taskChan chan *ScheduledTask
}
func NewTaskExecutor(rateLimit int) *TaskExecutor {
return &TaskExecutor{
rateLimit: make(chan struct{}, rateLimit),
taskChan: make(chan *ScheduledTask, 100),
}
}
func (te *TaskExecutor) Len() int {
return len(te.tasks)
}
func (te *TaskExecutor) AddTask(task Task, interval time.Duration) {
log.Printf("Adding task %T with interval %v\n", task, interval)
st := &ScheduledTask{
task: task,
interval: interval,
lastRunTime: time.Now(),
}
select {
case te.taskChan <- st:
log.Printf("Task %T queued up with interval %v\n", task, interval)
default:
log.Printf("Failed to add task $T with interval %v, channel full\n", task, interval)
}
}
func (te *TaskExecutor) Start() {
go func() {
for {
select {
case st := <-te.taskChan:
te.tasks = append(te.tasks, st)
if st.interval == 0 {
log.Printf("Task %T has an interval of 0, running now\n", st.task)
go func(st *ScheduledTask) {
te.executeTask(st)
}(st)
} else if st.interval < 0 {
log.Printf("Task %T has a negative interval, nonsensical, ignoring", st.task)
} else {
go func(st *ScheduledTask) {
ticker := time.NewTicker(st.interval)
defer ticker.Stop()
for t := range ticker.C {
if !te.shouldRun(st, t) {
continue
}
st.lastRunTime = t
log.Printf("Task %T last run time: %v, running now\n", st.task, st.lastRunTime)
go te.executeTask(st)
}
}(st)
}
}
}
}()
}
func (te *TaskExecutor) shouldRun(st *ScheduledTask, t time.Time) bool {
return t.Sub(st.lastRunTime) >= st.interval
}
func (te *TaskExecutor) executeTask(st *ScheduledTask) {
te.rateLimit <- struct{}{}
defer func() {
<-te.rateLimit
}()
if err := st.task.Execute(); err != nil {
log.Printf("Task %v failed: %v", st.task, err)
}
}

137
task_test.go Normal file
View File

@ -0,0 +1,137 @@
package task
import (
"errors"
"testing"
"time"
)
type mockTask struct {
executeFunc func() error
}
func (m *mockTask) Execute() error {
return m.executeFunc()
}
func TestAddTask(t *testing.T) {
te := NewTaskExecutor(10)
te.AddTask(&mockTask{}, 1*time.Second)
if te.Len() != 1 {
t.Errorf("expected 1 task, got %d", te.Len())
}
}
func TestExecuteTaskSuccess(t *testing.T) {
te := NewTaskExecutor(10)
executeCalled := false
te.AddTask(&mockTask{
executeFunc: func() error {
executeCalled = true
return nil
},
}, 50*time.Millisecond)
te.Start()
time.Sleep(200 * time.Millisecond)
if !executeCalled {
t.Error("expected execute to be called, but it was not")
}
}
func TestExecuteTaskFailure(t *testing.T) {
expectedError := errors.New("task failed")
te := NewTaskExecutor(10)
executeCalled := false
te.AddTask(&mockTask{
executeFunc: func() error {
executeCalled = true
return expectedError
},
}, 50*time.Millisecond)
te.Start()
time.Sleep(200 * time.Millisecond)
if !executeCalled {
t.Error("expected execute to be called, but it was not")
}
}
func TestRateLimit(t *testing.T) {
te := NewTaskExecutor(1)
for i := 0; i < 5; i++ {
delay := time.Duration(i) * time.Millisecond
te.AddTask(&mockTask{
executeFunc: func() error {
return nil
},
}, delay)
}
te.Start()
done := make(chan struct{})
go func() {
close(done)
}()
select {
case <-time.After(200 * time.Millisecond):
t.Error("expected all tasks to be executed within 200ms, but they were not")
case <-done:
// test passed
}
}
func TestZeroInterval(t *testing.T) {
te := NewTaskExecutor(10)
executeCalled := false
te.AddTask(&mockTask{
executeFunc: func() error {
executeCalled = true
return nil
},
}, 0*time.Second)
te.Start()
time.Sleep(50 * time.Millisecond)
if !executeCalled {
t.Error("expected execute to be called, but it was not")
}
}
func TestNoTasks(t *testing.T) {
te := NewTaskExecutor(10)
te.Start()
time.Sleep(50 * time.Millisecond)
// test passed if no panic occurred
}
var ErrorTask = errors.New("task failed")
var executeTaskTestCases = []struct {
name string
executeError error
expectedError error
}{
{
name: "success",
executeError: nil,
expectedError: nil,
},
{
name: "failure",
executeError: ErrorTask,
expectedError: ErrorTask,
},
}
func TestExecuteTask(t *testing.T) {
for _, tc := range executeTaskTestCases {
t.Run(tc.name, func(t *testing.T) {
te := NewTaskExecutor(10)
executeCalled := false
te.AddTask(&mockTask{
executeFunc: func() error {
executeCalled = true
return tc.executeError
},
}, 50*time.Millisecond)
te.Start()
time.Sleep(200 * time.Millisecond)
if !executeCalled {
t.Fatal("expected execute to be called, but it was not")
}
if tc.expectedError != nil && !errors.Is(tc.expectedError, tc.executeError) {
t.Errorf("expected error '%+v', got '%+v'", tc.expectedError, tc.executeError)
}
})
}
}