96 lines
2.1 KiB
Go
96 lines
2.1 KiB
Go
package task
|
|
|
|
import (
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
type Task interface {
|
|
Execute() error
|
|
}
|
|
|
|
type ScheduledTask struct {
|
|
task Task
|
|
interval time.Duration
|
|
lastRunTime time.Time
|
|
}
|
|
|
|
type TaskExecutor struct {
|
|
tasks []*ScheduledTask
|
|
rateLimit chan struct{}
|
|
taskChan chan *ScheduledTask
|
|
}
|
|
|
|
func NewTaskExecutor(rateLimit int) *TaskExecutor {
|
|
return &TaskExecutor{
|
|
rateLimit: make(chan struct{}, rateLimit),
|
|
taskChan: make(chan *ScheduledTask, 100),
|
|
}
|
|
}
|
|
|
|
func (te *TaskExecutor) Len() int {
|
|
return len(te.tasks)
|
|
}
|
|
|
|
func (te *TaskExecutor) AddTask(task Task, interval time.Duration) {
|
|
log.Printf("Adding task %T with interval %v\n", task, interval)
|
|
st := &ScheduledTask{
|
|
task: task,
|
|
interval: interval,
|
|
lastRunTime: time.Now(),
|
|
}
|
|
|
|
select {
|
|
case te.taskChan <- st:
|
|
log.Printf("Task %T queued up with interval %v\n", task, interval)
|
|
default:
|
|
log.Printf("Failed to add task $T with interval %v, channel full\n", task, interval)
|
|
}
|
|
}
|
|
|
|
func (te *TaskExecutor) Start() {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case st := <-te.taskChan:
|
|
te.tasks = append(te.tasks, st)
|
|
if st.interval == 0 {
|
|
log.Printf("Task %T has an interval of 0, running now\n", st.task)
|
|
go func(st *ScheduledTask) {
|
|
te.executeTask(st)
|
|
}(st)
|
|
} else if st.interval < 0 {
|
|
log.Printf("Task %T has a negative interval, nonsensical, ignoring", st.task)
|
|
} else {
|
|
go func(st *ScheduledTask) {
|
|
ticker := time.NewTicker(st.interval)
|
|
defer ticker.Stop()
|
|
for t := range ticker.C {
|
|
if !te.shouldRun(st, t) {
|
|
continue
|
|
}
|
|
st.lastRunTime = t
|
|
log.Printf("Task %T last run time: %v, running now\n", st.task, st.lastRunTime)
|
|
go te.executeTask(st)
|
|
}
|
|
}(st)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (te *TaskExecutor) shouldRun(st *ScheduledTask, t time.Time) bool {
|
|
return t.Sub(st.lastRunTime) >= st.interval
|
|
}
|
|
|
|
func (te *TaskExecutor) executeTask(st *ScheduledTask) {
|
|
te.rateLimit <- struct{}{}
|
|
defer func() {
|
|
<-te.rateLimit
|
|
}()
|
|
if err := st.task.Execute(); err != nil {
|
|
log.Printf("Task %v failed: %v", st.task, err)
|
|
}
|
|
}
|