commit 2869941b0c3891f06e1da9226ce498c160ea6d41 Author: Jeremy Tregunna Date: Thu Dec 26 01:03:17 2024 -0600 feat: Initial task execution system diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..484444c --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.canoozie.net/jer/task + +go 1.23.2 diff --git a/task.go b/task.go new file mode 100644 index 0000000..1082eda --- /dev/null +++ b/task.go @@ -0,0 +1,95 @@ +package task + +import ( + "log" + "time" +) + +type Task interface { + Execute() error +} + +type ScheduledTask struct { + task Task + interval time.Duration + lastRunTime time.Time +} + +type TaskExecutor struct { + tasks []*ScheduledTask + rateLimit chan struct{} + taskChan chan *ScheduledTask +} + +func NewTaskExecutor(rateLimit int) *TaskExecutor { + return &TaskExecutor{ + rateLimit: make(chan struct{}, rateLimit), + taskChan: make(chan *ScheduledTask, 100), + } +} + +func (te *TaskExecutor) Len() int { + return len(te.tasks) +} + +func (te *TaskExecutor) AddTask(task Task, interval time.Duration) { + log.Printf("Adding task %T with interval %v\n", task, interval) + st := &ScheduledTask{ + task: task, + interval: interval, + lastRunTime: time.Now(), + } + + select { + case te.taskChan <- st: + log.Printf("Task %T queued up with interval %v\n", task, interval) + default: + log.Printf("Failed to add task $T with interval %v, channel full\n", task, interval) + } +} + +func (te *TaskExecutor) Start() { + go func() { + for { + select { + case st := <-te.taskChan: + te.tasks = append(te.tasks, st) + if st.interval == 0 { + log.Printf("Task %T has an interval of 0, running now\n", st.task) + go func(st *ScheduledTask) { + te.executeTask(st) + }(st) + } else if st.interval < 0 { + log.Printf("Task %T has a negative interval, nonsensical, ignoring", st.task) + } else { + go func(st *ScheduledTask) { + ticker := time.NewTicker(st.interval) + defer ticker.Stop() + for t := range ticker.C { + if !te.shouldRun(st, t) { + continue + } + st.lastRunTime = t + log.Printf("Task %T last run time: %v, running now\n", st.task, st.lastRunTime) + go te.executeTask(st) + } + }(st) + } + } + } + }() +} + +func (te *TaskExecutor) shouldRun(st *ScheduledTask, t time.Time) bool { + return t.Sub(st.lastRunTime) >= st.interval +} + +func (te *TaskExecutor) executeTask(st *ScheduledTask) { + te.rateLimit <- struct{}{} + defer func() { + <-te.rateLimit + }() + if err := st.task.Execute(); err != nil { + log.Printf("Task %v failed: %v", st.task, err) + } +} diff --git a/task_test.go b/task_test.go new file mode 100644 index 0000000..b374bdf --- /dev/null +++ b/task_test.go @@ -0,0 +1,137 @@ +package task + +import ( + "errors" + "testing" + "time" +) + +type mockTask struct { + executeFunc func() error +} + +func (m *mockTask) Execute() error { + return m.executeFunc() +} +func TestAddTask(t *testing.T) { + te := NewTaskExecutor(10) + te.AddTask(&mockTask{}, 1*time.Second) + if te.Len() != 1 { + t.Errorf("expected 1 task, got %d", te.Len()) + } +} +func TestExecuteTaskSuccess(t *testing.T) { + te := NewTaskExecutor(10) + executeCalled := false + te.AddTask(&mockTask{ + executeFunc: func() error { + executeCalled = true + return nil + }, + }, 50*time.Millisecond) + te.Start() + time.Sleep(200 * time.Millisecond) + if !executeCalled { + t.Error("expected execute to be called, but it was not") + } +} +func TestExecuteTaskFailure(t *testing.T) { + expectedError := errors.New("task failed") + te := NewTaskExecutor(10) + executeCalled := false + te.AddTask(&mockTask{ + executeFunc: func() error { + executeCalled = true + return expectedError + }, + }, 50*time.Millisecond) + te.Start() + time.Sleep(200 * time.Millisecond) + if !executeCalled { + t.Error("expected execute to be called, but it was not") + } +} +func TestRateLimit(t *testing.T) { + te := NewTaskExecutor(1) + for i := 0; i < 5; i++ { + delay := time.Duration(i) * time.Millisecond + te.AddTask(&mockTask{ + executeFunc: func() error { + return nil + }, + }, delay) + } + te.Start() + done := make(chan struct{}) + go func() { + close(done) + }() + select { + case <-time.After(200 * time.Millisecond): + t.Error("expected all tasks to be executed within 200ms, but they were not") + case <-done: + // test passed + } +} +func TestZeroInterval(t *testing.T) { + te := NewTaskExecutor(10) + executeCalled := false + te.AddTask(&mockTask{ + executeFunc: func() error { + executeCalled = true + return nil + }, + }, 0*time.Second) + te.Start() + time.Sleep(50 * time.Millisecond) + if !executeCalled { + t.Error("expected execute to be called, but it was not") + } +} +func TestNoTasks(t *testing.T) { + te := NewTaskExecutor(10) + te.Start() + time.Sleep(50 * time.Millisecond) + // test passed if no panic occurred +} + +var ErrorTask = errors.New("task failed") +var executeTaskTestCases = []struct { + name string + executeError error + expectedError error +}{ + { + name: "success", + executeError: nil, + expectedError: nil, + }, + { + name: "failure", + executeError: ErrorTask, + expectedError: ErrorTask, + }, +} + +func TestExecuteTask(t *testing.T) { + for _, tc := range executeTaskTestCases { + t.Run(tc.name, func(t *testing.T) { + te := NewTaskExecutor(10) + executeCalled := false + te.AddTask(&mockTask{ + executeFunc: func() error { + executeCalled = true + return tc.executeError + }, + }, 50*time.Millisecond) + te.Start() + time.Sleep(200 * time.Millisecond) + if !executeCalled { + t.Fatal("expected execute to be called, but it was not") + } + if tc.expectedError != nil && !errors.Is(tc.expectedError, tc.executeError) { + t.Errorf("expected error '%+v', got '%+v'", tc.expectedError, tc.executeError) + } + }) + } +}