mirror of
https://gitea.com/gitea/act_runner.git
synced 2025-10-24 10:38:55 +02:00
Merge branch 'main' into feature/fetch_task_with_index
This commit is contained in:
69
internal/app/cmd/cache-server.go
Normal file
69
internal/app/cmd/cache-server.go
Normal file
@@ -0,0 +1,69 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
"gitea.com/gitea/act_runner/internal/pkg/config"
|
||||
|
||||
"github.com/nektos/act/pkg/artifactcache"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
type cacheServerArgs struct {
|
||||
Dir string
|
||||
Host string
|
||||
Port uint16
|
||||
}
|
||||
|
||||
func runCacheServer(ctx context.Context, configFile *string, cacheArgs *cacheServerArgs) func(cmd *cobra.Command, args []string) error {
|
||||
return func(cmd *cobra.Command, args []string) error {
|
||||
cfg, err := config.LoadDefault(*configFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid configuration: %w", err)
|
||||
}
|
||||
|
||||
initLogging(cfg)
|
||||
|
||||
var (
|
||||
dir = cfg.Cache.Dir
|
||||
host = cfg.Cache.Host
|
||||
port = cfg.Cache.Port
|
||||
)
|
||||
|
||||
// cacheArgs has higher priority
|
||||
if cacheArgs.Dir != "" {
|
||||
dir = cacheArgs.Dir
|
||||
}
|
||||
if cacheArgs.Host != "" {
|
||||
host = cacheArgs.Host
|
||||
}
|
||||
if cacheArgs.Port != 0 {
|
||||
port = cacheArgs.Port
|
||||
}
|
||||
|
||||
cacheHandler, err := artifactcache.StartHandler(
|
||||
dir,
|
||||
host,
|
||||
port,
|
||||
log.StandardLogger().WithField("module", "cache_request"),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("cache server is listening on %v", cacheHandler.ExternalURL())
|
||||
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, os.Interrupt)
|
||||
<-c
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -63,6 +63,19 @@ func Execute(ctx context.Context) {
|
||||
},
|
||||
})
|
||||
|
||||
// ./act_runner cache-server
|
||||
var cacheArgs cacheServerArgs
|
||||
cacheCmd := &cobra.Command{
|
||||
Use: "cache-server",
|
||||
Short: "Start a cache server for the cache action",
|
||||
Args: cobra.MaximumNArgs(0),
|
||||
RunE: runCacheServer(ctx, &configFile, &cacheArgs),
|
||||
}
|
||||
cacheCmd.Flags().StringVarP(&cacheArgs.Dir, "dir", "d", "", "Cache directory")
|
||||
cacheCmd.Flags().StringVarP(&cacheArgs.Host, "host", "s", "", "Host of the cache server")
|
||||
cacheCmd.Flags().Uint16VarP(&cacheArgs.Port, "port", "p", 0, "Port of the cache server")
|
||||
rootCmd.AddCommand(cacheCmd)
|
||||
|
||||
// hide completion command
|
||||
rootCmd.CompletionOptions.HiddenDefaultCmd = true
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -78,7 +79,7 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command,
|
||||
cfg.Container.DockerHost = dockerSocketPath
|
||||
}
|
||||
// check the scheme, if the scheme is not npipe or unix
|
||||
// set cfg.Container.DockerHost to "-" because it can't be mounted to the job conatiner
|
||||
// set cfg.Container.DockerHost to "-" because it can't be mounted to the job container
|
||||
if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
|
||||
scheme := cfg.Container.DockerHost[:protoIndex]
|
||||
if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
|
||||
@@ -160,6 +161,15 @@ func initLogging(cfg *config.Config) {
|
||||
}
|
||||
}
|
||||
|
||||
var commonSocketPaths = []string{
|
||||
"/var/run/docker.sock",
|
||||
"/var/run/podman/podman.sock",
|
||||
"$HOME/.colima/docker.sock",
|
||||
"$XDG_RUNTIME_DIR/docker.sock",
|
||||
`\\.\pipe\docker_engine`,
|
||||
"$HOME/.docker/run/docker.sock",
|
||||
}
|
||||
|
||||
func getDockerSocketPath(configDockerHost string) (string, error) {
|
||||
// a `-` means don't mount the docker socket to job containers
|
||||
if configDockerHost != "" && configDockerHost != "-" {
|
||||
@@ -171,5 +181,14 @@ func getDockerSocketPath(configDockerHost string) (string, error) {
|
||||
return socket, nil
|
||||
}
|
||||
|
||||
for _, p := range commonSocketPaths {
|
||||
if _, err := os.Lstat(os.ExpandEnv(p)); err == nil {
|
||||
if strings.HasPrefix(p, `\\.\`) {
|
||||
return "npipe://" + filepath.ToSlash(os.ExpandEnv(p)), nil
|
||||
}
|
||||
return "unix://" + filepath.ToSlash(os.ExpandEnv(p)), nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("daemon Docker Engine socket not found and docker_host config was invalid")
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ type executeArgs struct {
|
||||
envs []string
|
||||
envfile string
|
||||
secrets []string
|
||||
defaultActionsUrls []string
|
||||
defaultActionsURL string
|
||||
insecureSecrets bool
|
||||
privileged bool
|
||||
usernsMode string
|
||||
@@ -252,7 +252,7 @@ func runExecList(ctx context.Context, planner model.WorkflowPlanner, execArgs *e
|
||||
var filterPlan *model.Plan
|
||||
|
||||
// Determine the event name to be filtered
|
||||
var filterEventName string = ""
|
||||
var filterEventName string
|
||||
|
||||
if len(execArgs.event) > 0 {
|
||||
log.Infof("Using chosed event for filtering: %s", execArgs.event)
|
||||
@@ -289,7 +289,7 @@ func runExecList(ctx context.Context, planner model.WorkflowPlanner, execArgs *e
|
||||
}
|
||||
}
|
||||
|
||||
printList(filterPlan)
|
||||
_ = printList(filterPlan)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -359,11 +359,11 @@ func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command
|
||||
execArgs.cacheHandler = handler
|
||||
|
||||
if len(execArgs.artifactServerAddr) == 0 {
|
||||
if ip := common.GetOutboundIP(); ip == nil {
|
||||
ip := common.GetOutboundIP()
|
||||
if ip == nil {
|
||||
return fmt.Errorf("unable to determine outbound IP address")
|
||||
} else {
|
||||
execArgs.artifactServerAddr = ip.String()
|
||||
}
|
||||
execArgs.artifactServerAddr = ip.String()
|
||||
}
|
||||
|
||||
if len(execArgs.artifactServerPath) == 0 {
|
||||
@@ -404,10 +404,10 @@ func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command
|
||||
NoSkipCheckout: execArgs.noSkipCheckout,
|
||||
// PresetGitHubContext: preset,
|
||||
// EventJSON: string(eventJSON),
|
||||
ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%s", eventName),
|
||||
ContainerMaxLifetime: maxLifetime,
|
||||
ContainerNetworkMode: container.NetworkMode(execArgs.network),
|
||||
DefaultActionsURLs: execArgs.defaultActionsUrls,
|
||||
ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%s", eventName),
|
||||
ContainerMaxLifetime: maxLifetime,
|
||||
ContainerNetworkMode: container.NetworkMode(execArgs.network),
|
||||
DefaultActionInstance: execArgs.defaultActionsURL,
|
||||
PlatformPicker: func(_ []string) string {
|
||||
return execArgs.image
|
||||
},
|
||||
@@ -423,7 +423,7 @@ func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command
|
||||
}
|
||||
|
||||
if !execArgs.debug {
|
||||
logLevel := log.Level(log.InfoLevel)
|
||||
logLevel := log.InfoLevel
|
||||
config.JobLoggerLevel = &logLevel
|
||||
}
|
||||
|
||||
@@ -480,7 +480,7 @@ func loadExecCmd(ctx context.Context) *cobra.Command {
|
||||
execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPath, "artifact-server-path", "", ".", "Defines the path where the artifact server stores uploads and retrieves downloads from. If not specified the artifact server will not start.")
|
||||
execCmd.PersistentFlags().StringVarP(&execArg.artifactServerAddr, "artifact-server-addr", "", "", "Defines the address where the artifact server listens")
|
||||
execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPort, "artifact-server-port", "", "34567", "Defines the port where the artifact server listens (will only bind to localhost).")
|
||||
execCmd.PersistentFlags().StringArrayVarP(&execArg.defaultActionsUrls, "default-actions-url", "", []string{"https://gitea.com", "https://github.com"}, "Defines the default url list of action instance.")
|
||||
execCmd.PersistentFlags().StringVarP(&execArg.defaultActionsURL, "default-actions-url", "", "https://github.com", "Defines the default url of action instance.")
|
||||
execCmd.PersistentFlags().BoolVarP(&execArg.noSkipCheckout, "no-skip-checkout", "", false, "Do not skip actions/checkout")
|
||||
execCmd.PersistentFlags().BoolVarP(&execArg.debug, "debug", "d", false, "enable debug log")
|
||||
execCmd.PersistentFlags().BoolVarP(&execArg.dryrun, "dryrun", "n", false, "dryrun mode")
|
||||
|
||||
@@ -47,12 +47,12 @@ func runRegister(ctx context.Context, regArgs *registerArgs, configFile *string)
|
||||
}
|
||||
|
||||
if regArgs.NoInteractive {
|
||||
if err := registerNoInteractive(*configFile, regArgs); err != nil {
|
||||
if err := registerNoInteractive(ctx, *configFile, regArgs); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
go func() {
|
||||
if err := registerInteractive(*configFile); err != nil {
|
||||
if err := registerInteractive(ctx, *configFile); err != nil {
|
||||
log.Fatal(err)
|
||||
return
|
||||
}
|
||||
@@ -187,7 +187,7 @@ func (r *registerInputs) assignToNext(stage registerStage, value string, cfg *co
|
||||
return StageUnknown
|
||||
}
|
||||
|
||||
func registerInteractive(configFile string) error {
|
||||
func registerInteractive(ctx context.Context, configFile string) error {
|
||||
var (
|
||||
reader = bufio.NewReader(os.Stdin)
|
||||
stage = StageInputInstance
|
||||
@@ -213,11 +213,10 @@ func registerInteractive(configFile string) error {
|
||||
|
||||
if stage == StageWaitingForRegistration {
|
||||
log.Infof("Registering runner, name=%s, instance=%s, labels=%v.", inputs.RunnerName, inputs.InstanceAddr, inputs.Labels)
|
||||
if err := doRegister(cfg, inputs); err != nil {
|
||||
if err := doRegister(ctx, cfg, inputs); err != nil {
|
||||
return fmt.Errorf("Failed to register runner: %w", err)
|
||||
} else {
|
||||
log.Infof("Runner registered successfully.")
|
||||
}
|
||||
log.Infof("Runner registered successfully.")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -250,7 +249,7 @@ func printStageHelp(stage registerStage) {
|
||||
}
|
||||
}
|
||||
|
||||
func registerNoInteractive(configFile string, regArgs *registerArgs) error {
|
||||
func registerNoInteractive(ctx context.Context, configFile string, regArgs *registerArgs) error {
|
||||
cfg, err := config.LoadDefault(configFile)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -282,16 +281,14 @@ func registerNoInteractive(configFile string, regArgs *registerArgs) error {
|
||||
log.WithError(err).Errorf("Invalid input, please re-run act command.")
|
||||
return nil
|
||||
}
|
||||
if err := doRegister(cfg, inputs); err != nil {
|
||||
if err := doRegister(ctx, cfg, inputs); err != nil {
|
||||
return fmt.Errorf("Failed to register runner: %w", err)
|
||||
}
|
||||
log.Infof("Runner registered successfully.")
|
||||
return nil
|
||||
}
|
||||
|
||||
func doRegister(cfg *config.Config, inputs *registerInputs) error {
|
||||
ctx := context.Background()
|
||||
|
||||
func doRegister(ctx context.Context, cfg *config.Config, inputs *registerInputs) error {
|
||||
// initial http client
|
||||
cli := client.New(
|
||||
inputs.InstanceAddr,
|
||||
@@ -307,7 +304,7 @@ func doRegister(cfg *config.Config, inputs *registerInputs) error {
|
||||
}))
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
|
||||
@@ -6,6 +6,7 @@ package poll
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
@@ -48,19 +49,30 @@ func (p *Poller) Poll(ctx context.Context) {
|
||||
func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
log.WithError(err).Debug("limiter wait failed")
|
||||
}
|
||||
return
|
||||
p.pollTaskWithRateLimit(ctx, limiter)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) pollTaskWithRateLimit(ctx context.Context, limiter *rate.Limiter) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err := fmt.Errorf("panic: %v", r)
|
||||
log.WithError(err).Error("panic in pollTaskWithRateLimit")
|
||||
}
|
||||
task, ok := p.fetchTask(ctx)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if err := p.runner.Run(ctx, task); err != nil {
|
||||
log.WithError(err).Error("failed to run task")
|
||||
}()
|
||||
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
log.WithError(err).Debug("limiter wait failed")
|
||||
}
|
||||
return
|
||||
}
|
||||
task, ok := p.fetchTask(ctx)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err := p.runner.Run(ctx, task); err != nil {
|
||||
log.WithError(err).Error("failed to run task")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -53,17 +53,21 @@ func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client)
|
||||
envs[k] = v
|
||||
}
|
||||
if cfg.Cache.Enabled == nil || *cfg.Cache.Enabled {
|
||||
cacheHandler, err := artifactcache.StartHandler(
|
||||
cfg.Cache.Dir,
|
||||
cfg.Cache.Host,
|
||||
cfg.Cache.Port,
|
||||
log.StandardLogger().WithField("module", "cache_request"),
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("cannot init cache server, it will be disabled: %v", err)
|
||||
// go on
|
||||
if cfg.Cache.ExternalServer != "" {
|
||||
envs["ACTIONS_CACHE_URL"] = cfg.Cache.ExternalServer
|
||||
} else {
|
||||
envs["ACTIONS_CACHE_URL"] = cacheHandler.ExternalURL() + "/"
|
||||
cacheHandler, err := artifactcache.StartHandler(
|
||||
cfg.Cache.Dir,
|
||||
cfg.Cache.Host,
|
||||
cfg.Cache.Port,
|
||||
log.StandardLogger().WithField("module", "cache_request"),
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("cannot init cache server, it will be disabled: %v", err)
|
||||
// go on
|
||||
} else {
|
||||
envs["ACTIONS_CACHE_URL"] = cacheHandler.ExternalURL() + "/"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,10 +91,9 @@ func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client)
|
||||
func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
|
||||
if _, ok := r.runningTasks.Load(task.Id); ok {
|
||||
return fmt.Errorf("task %d is already running", task.Id)
|
||||
} else {
|
||||
r.runningTasks.Store(task.Id, struct{}{})
|
||||
defer r.runningTasks.Delete(task.Id)
|
||||
}
|
||||
r.runningTasks.Store(task.Id, struct{}{})
|
||||
defer r.runningTasks.Delete(task.Id)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout)
|
||||
defer cancel()
|
||||
@@ -197,7 +200,7 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
|
||||
ContainerOptions: r.cfg.Container.Options,
|
||||
ContainerDaemonSocket: r.cfg.Container.DockerHost,
|
||||
Privileged: r.cfg.Container.Privileged,
|
||||
DefaultActionsURLs: parseDefaultActionsURLs(taskContext["gitea_default_actions_url"].GetStringValue()),
|
||||
DefaultActionInstance: taskContext["gitea_default_actions_url"].GetStringValue(),
|
||||
PlatformPicker: r.labels.PickPlatform,
|
||||
Vars: task.Vars,
|
||||
ValidVolumes: r.cfg.Container.ValidVolumes,
|
||||
@@ -219,16 +222,6 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
|
||||
return execErr
|
||||
}
|
||||
|
||||
func parseDefaultActionsURLs(s string) []string {
|
||||
urls := strings.Split(s, ",")
|
||||
trimmed := make([]string, 0, len(urls))
|
||||
for _, u := range urls {
|
||||
t := strings.TrimRight(strings.TrimSpace(u), "/")
|
||||
trimmed = append(trimmed, t)
|
||||
}
|
||||
return trimmed
|
||||
}
|
||||
|
||||
func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Response[runnerv1.DeclareResponse], error) {
|
||||
return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{
|
||||
Version: ver.Version(),
|
||||
|
||||
Reference in New Issue
Block a user