task/task.go

96 lines
2.1 KiB
Go
Raw Permalink Normal View History

2024-12-26 07:03:17 +00:00
package task
import (
"log"
"time"
)
type Task interface {
Execute() error
}
type ScheduledTask struct {
task Task
interval time.Duration
lastRunTime time.Time
}
type TaskExecutor struct {
tasks []*ScheduledTask
rateLimit chan struct{}
taskChan chan *ScheduledTask
}
func NewTaskExecutor(rateLimit int) *TaskExecutor {
return &TaskExecutor{
rateLimit: make(chan struct{}, rateLimit),
taskChan: make(chan *ScheduledTask, 100),
}
}
func (te *TaskExecutor) Len() int {
return len(te.tasks)
}
func (te *TaskExecutor) AddTask(task Task, interval time.Duration) {
log.Printf("Adding task %T with interval %v\n", task, interval)
st := &ScheduledTask{
task: task,
interval: interval,
lastRunTime: time.Now(),
}
select {
case te.taskChan <- st:
log.Printf("Task %T queued up with interval %v\n", task, interval)
default:
log.Printf("Failed to add task $T with interval %v, channel full\n", task, interval)
}
}
func (te *TaskExecutor) Start() {
go func() {
for {
select {
case st := <-te.taskChan:
te.tasks = append(te.tasks, st)
if st.interval == 0 {
log.Printf("Task %T has an interval of 0, running now\n", st.task)
go func(st *ScheduledTask) {
te.executeTask(st)
}(st)
} else if st.interval < 0 {
log.Printf("Task %T has a negative interval, nonsensical, ignoring", st.task)
} else {
go func(st *ScheduledTask) {
ticker := time.NewTicker(st.interval)
defer ticker.Stop()
for t := range ticker.C {
if !te.shouldRun(st, t) {
continue
}
st.lastRunTime = t
log.Printf("Task %T last run time: %v, running now\n", st.task, st.lastRunTime)
go te.executeTask(st)
}
}(st)
}
}
}
}()
}
func (te *TaskExecutor) shouldRun(st *ScheduledTask, t time.Time) bool {
return t.Sub(st.lastRunTime) >= st.interval
}
func (te *TaskExecutor) executeTask(st *ScheduledTask) {
te.rateLimit <- struct{}{}
defer func() {
<-te.rateLimit
}()
if err := st.task.Execute(); err != nil {
log.Printf("Task %v failed: %v", st.task, err)
}
}