Merge branch 'main' into feature/fetch_task_with_index

This commit is contained in:
sillyguodong 2023-07-24 15:27:58 +08:00
commit 27ea804e21

View File

@ -48,18 +48,6 @@ func (p *Poller) Poll(ctx context.Context) {
func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) { func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) {
defer wg.Done() defer wg.Done()
for { for {
p.pollTaskWithRateLimit(ctx, limiter)
}
}
func (p *Poller) pollTaskWithRateLimit(ctx context.Context, limiter *rate.Limiter) {
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("panic: %v", r)
log.WithError(err).Error("panic in pollTaskWithRateLimit")
}
}()
if err := limiter.Wait(ctx); err != nil { if err := limiter.Wait(ctx); err != nil {
if ctx.Err() != nil { if ctx.Err() != nil {
log.WithError(err).Debug("limiter wait failed") log.WithError(err).Debug("limiter wait failed")
@ -68,8 +56,20 @@ func (p *Poller) pollTaskWithRateLimit(ctx context.Context, limiter *rate.Limite
} }
task, ok := p.fetchTask(ctx) task, ok := p.fetchTask(ctx)
if !ok { if !ok {
return continue
} }
p.runTaskWithRecover(ctx, task)
}
}
func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("panic: %v", r)
log.WithError(err).Error("panic in runTaskWithRecover")
}
}()
if err := p.runner.Run(ctx, task); err != nil { if err := p.runner.Run(ctx, task); err != nil {
log.WithError(err).Error("failed to run task") log.WithError(err).Error("failed to run task")
} }