Refactor to new framework (#98)

- Adjust directory structure
```text
├── internal
│   ├── app
│   │   ├── artifactcache
│   │   ├── cmd
│   │   ├── poll
│   │   └── run
│   └── pkg
│       ├── client
│       ├── config
│       ├── envcheck
│       ├── labels
│       ├── report
│       └── ver
└── main.go
```
- New pkg `labels` to parse label
- New pkg `report` to report logs to Gitea
- Remove pkg `engine`, use `envcheck` to check if docker running.
- Rewrite `runtime` to `run`
- Rewrite `poller` to `poll`
- Simplify some code and remove what's useless.

Reviewed-on: https://gitea.com/gitea/act_runner/pulls/98
Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com>
Co-authored-by: Jason Song <i@wolfogre.com>
Co-committed-by: Jason Song <i@wolfogre.com>
This commit is contained in:
Jason Song
2023-04-04 21:32:04 +08:00
committed by Lunny Xiao
parent df3cb60978
commit 220efa69c0
42 changed files with 630 additions and 974 deletions

View File

@ -0,0 +1,17 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package client
import (
"code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
)
// A Client manages communication with the runner.
type Client interface {
pingv1connect.PingServiceClient
runnerv1connect.RunnerServiceClient
Address() string
Insecure() bool
}

View File

@ -0,0 +1,10 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package client
const (
UUIDHeader = "x-runner-uuid"
TokenHeader = "x-runner-token"
VersionHeader = "x-runner-version"
)

View File

@ -0,0 +1,81 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package client
import (
"context"
"crypto/tls"
"net/http"
"strings"
"code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
"github.com/bufbuild/connect-go"
)
func getHttpClient(endpoint string, insecure bool) *http.Client {
if strings.HasPrefix(endpoint, "https://") && insecure {
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
}
return http.DefaultClient
}
// New returns a new runner client.
func New(endpoint string, insecure bool, uuid, token, version string, opts ...connect.ClientOption) *HTTPClient {
baseURL := strings.TrimRight(endpoint, "/") + "/api/actions"
opts = append(opts, connect.WithInterceptors(connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
if uuid != "" {
req.Header().Set(UUIDHeader, uuid)
}
if token != "" {
req.Header().Set(TokenHeader, token)
}
if version != "" {
req.Header().Set(VersionHeader, version)
}
return next(ctx, req)
}
})))
return &HTTPClient{
PingServiceClient: pingv1connect.NewPingServiceClient(
getHttpClient(endpoint, insecure),
baseURL,
opts...,
),
RunnerServiceClient: runnerv1connect.NewRunnerServiceClient(
getHttpClient(endpoint, insecure),
baseURL,
opts...,
),
endpoint: endpoint,
insecure: insecure,
}
}
func (c *HTTPClient) Address() string {
return c.endpoint
}
func (c *HTTPClient) Insecure() bool {
return c.insecure
}
var _ Client = (*HTTPClient)(nil)
// An HTTPClient manages communication with the runner API.
type HTTPClient struct {
pingv1connect.PingServiceClient
runnerv1connect.RunnerServiceClient
endpoint string
insecure bool
}

View File

@ -0,0 +1,42 @@
# Example configuration file, it's safe to copy this as the default config file without any modification.
log:
# The level of logging, can be trace, debug, info, warn, error, fatal
level: info
runner:
# Where to store the registration result.
file: .runner
# Execute how many tasks concurrently at the same time.
capacity: 1
# Extra environment variables to run jobs.
envs:
A_TEST_ENV_NAME_1: a_test_env_value_1
A_TEST_ENV_NAME_2: a_test_env_value_2
# Extra environment variables to run jobs from a file.
# It will be ignored if it's empty or the file doesn't exist.
env_file: .env
# The timeout for a job to be finished.
# Please note that the Gitea instance also has a timeout (3h by default) for the job.
# So the job could be stopped by the Gitea instance if it's timeout is shorter than this.
timeout: 3h
# Whether skip verifying the TLS certificate of the Gitea instance.
insecure: false
cache:
# Enable cache server to use actions/cache.
enabled: true
# The directory to store the cache data.
# If it's empty, the cache data will be stored in $HOME/.cache/actcache.
dir: ""
# The host of the cache server.
# It's not for the address to listen, but the address to connect from job containers.
# So 0.0.0.0 is a bad choice, leave it empty to detect automatically.
host: ""
# The port of the cache server.
# 0 means to use a random available port.
port: 0
container:
# Which network to use for the job containers. Could be bridge, host, none, or the name of a custom network.
network_mode: bridge

View File

@ -0,0 +1,95 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package config
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/joho/godotenv"
"gopkg.in/yaml.v3"
)
type Config struct {
Log struct {
Level string `yaml:"level"`
} `yaml:"log"`
Runner struct {
File string `yaml:"file"`
Capacity int `yaml:"capacity"`
Envs map[string]string `yaml:"envs"`
EnvFile string `yaml:"env_file"`
Timeout time.Duration `yaml:"timeout"`
Insecure bool `yaml:"insecure"`
} `yaml:"runner"`
Cache struct {
Enabled *bool `yaml:"enabled"` // pointer to distinguish between false and not set, and it will be true if not set
Dir string `yaml:"dir"`
Host string `yaml:"host"`
Port uint16 `yaml:"port"`
} `yaml:"cache"`
Container struct {
NetworkMode string `yaml:"network_mode"`
}
}
// LoadDefault returns the default configuration.
// If file is not empty, it will be used to load the configuration.
func LoadDefault(file string) (*Config, error) {
cfg := &Config{}
if file != "" {
f, err := os.Open(file)
if err != nil {
return nil, err
}
defer f.Close()
decoder := yaml.NewDecoder(f)
if err := decoder.Decode(&cfg); err != nil {
return nil, err
}
}
compatibleWithOldEnvs(file != "", cfg)
if cfg.Runner.EnvFile != "" {
if stat, err := os.Stat(cfg.Runner.EnvFile); err == nil && !stat.IsDir() {
envs, err := godotenv.Read(cfg.Runner.EnvFile)
if err != nil {
return nil, fmt.Errorf("read env file %q: %w", cfg.Runner.EnvFile, err)
}
for k, v := range envs {
cfg.Runner.Envs[k] = v
}
}
}
if cfg.Log.Level == "" {
cfg.Log.Level = "info"
}
if cfg.Runner.File == "" {
cfg.Runner.File = ".runner"
}
if cfg.Runner.Capacity <= 0 {
cfg.Runner.Capacity = 1
}
if cfg.Runner.Timeout <= 0 {
cfg.Runner.Timeout = 3 * time.Hour
}
if cfg.Cache.Enabled == nil {
b := true
cfg.Cache.Enabled = &b
}
if *cfg.Cache.Enabled {
if cfg.Cache.Dir == "" {
home, _ := os.UserHomeDir()
cfg.Cache.Dir = filepath.Join(home, ".cache", "actcache")
}
}
if cfg.Container.NetworkMode == "" {
cfg.Container.NetworkMode = "bridge"
}
return cfg, nil
}

View File

@ -0,0 +1,62 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package config
import (
"os"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
)
// Deprecated: could be removed in the future. TODO: remove it when Gitea 1.20.0 is released.
// Be compatible with old envs.
func compatibleWithOldEnvs(fileUsed bool, cfg *Config) {
handleEnv := func(key string) (string, bool) {
if v, ok := os.LookupEnv(key); ok {
if fileUsed {
log.Warnf("env %s has been ignored because config file is used", key)
return "", false
}
log.Warnf("env %s will be deprecated, please use config file instead", key)
return v, true
}
return "", false
}
if v, ok := handleEnv("GITEA_DEBUG"); ok {
if b, _ := strconv.ParseBool(v); b {
cfg.Log.Level = "debug"
}
}
if v, ok := handleEnv("GITEA_TRACE"); ok {
if b, _ := strconv.ParseBool(v); b {
cfg.Log.Level = "trace"
}
}
if v, ok := handleEnv("GITEA_RUNNER_CAPACITY"); ok {
if i, _ := strconv.Atoi(v); i > 0 {
cfg.Runner.Capacity = i
}
}
if v, ok := handleEnv("GITEA_RUNNER_FILE"); ok {
cfg.Runner.File = v
}
if v, ok := handleEnv("GITEA_RUNNER_ENVIRON"); ok {
splits := strings.Split(v, ",")
if cfg.Runner.Envs == nil {
cfg.Runner.Envs = map[string]string{}
}
for _, split := range splits {
kv := strings.SplitN(split, ":", 2)
if len(kv) == 2 && kv[0] != "" {
cfg.Runner.Envs[kv[0]] = kv[1]
}
}
}
if v, ok := handleEnv("GITEA_RUNNER_ENV_FILE"); ok {
cfg.Runner.EnvFile = v
}
}

View File

@ -0,0 +1,9 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package config
import _ "embed"
//go:embed config.example.yaml
var Example []byte

View File

@ -0,0 +1,54 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package config
import (
"encoding/json"
"os"
)
const registrationWarning = "This file is automatically generated by act-runner. Do not edit it manually unless you know what you are doing. Removing this file will cause act runner to re-register as a new runner."
// Registration is the registration information for a runner
type Registration struct {
Warning string `json:"WARNING"` // Warning message to display, it's always the registrationWarning constant
ID int64 `json:"id"`
UUID string `json:"uuid"`
Name string `json:"name"`
Token string `json:"token"`
Address string `json:"address"`
Labels []string `json:"labels"`
}
func LoadRegistration(file string) (*Registration, error) {
f, err := os.Open(file)
if err != nil {
return nil, err
}
defer f.Close()
var reg Registration
if err := json.NewDecoder(f).Decode(&reg); err != nil {
return nil, err
}
reg.Warning = ""
return &reg, nil
}
func SaveRegistration(file string, reg *Registration) error {
f, err := os.Create(file)
if err != nil {
return err
}
defer f.Close()
reg.Warning = registrationWarning
enc := json.NewEncoder(f)
enc.SetIndent("", " ")
return enc.Encode(reg)
}

View File

@ -0,0 +1,5 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
// Package envcheck provides a simple way to check if the environment is ready to run jobs.
package envcheck

View File

@ -0,0 +1,27 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package envcheck
import (
"context"
"fmt"
"github.com/docker/docker/client"
)
func CheckIfDockerRunning(ctx context.Context) error {
// TODO: if runner support configures to use docker, we need config.Config to pass in
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return err
}
defer cli.Close()
_, err = cli.Ping(ctx)
if err != nil {
return fmt.Errorf("cannot ping the docker daemon, does it running? %w", err)
}
return nil
}

View File

@ -0,0 +1,84 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package labels
import (
"fmt"
"strings"
)
const (
SchemeHost = "host"
SchemeDocker = "docker"
)
type Label struct {
Name string
Schema string
Arg string
}
func Parse(str string) (*Label, error) {
splits := strings.SplitN(str, ":", 3)
label := &Label{
Name: splits[0],
Schema: "host",
Arg: "",
}
if len(splits) >= 2 {
label.Schema = splits[1]
}
if len(splits) >= 3 {
label.Arg = splits[2]
}
if label.Schema != SchemeHost && label.Schema != SchemeDocker {
return nil, fmt.Errorf("unsupported schema: %s", label.Schema)
}
return label, nil
}
type Labels []*Label
func (l Labels) RequireDocker() bool {
for _, label := range l {
if label.Schema == SchemeDocker {
return true
}
}
return false
}
func (l Labels) PickPlatform(runsOn []string) string {
platforms := make(map[string]string, len(l))
for _, label := range l {
switch label.Schema {
case SchemeDocker:
// "//" will be ignored
// TODO maybe we should use 'ubuntu-18.04:docker:node:16-buster' instead
platforms[label.Name] = strings.TrimPrefix(label.Arg, "//")
case SchemeHost:
platforms[label.Name] = "-self-hosted"
default:
// It should not happen, because Parse has checked it.
continue
}
}
for _, v := range runsOn {
if v, ok := platforms[v]; ok {
return v
}
}
// TODO: support multiple labels
// like:
// ["ubuntu-22.04"] => "ubuntu:22.04"
// ["with-gpu"] => "linux:with-gpu"
// ["ubuntu-22.04", "with-gpu"] => "ubuntu:22.04_with-gpu"
// return default.
// So the runner receives a task with a label that the runner doesn't have,
// it happens when the user have edited the label of the runner in the web UI.
// TODO: it may be not correct, what if the runner is used as host mode only?
return "node:16-bullseye"
}

View File

@ -0,0 +1,64 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package labels
import (
"testing"
"github.com/stretchr/testify/require"
"gotest.tools/v3/assert"
)
func TestParse(t *testing.T) {
tests := []struct {
args string
want *Label
wantErr bool
}{
{
args: "ubuntu:docker://node:18",
want: &Label{
Name: "ubuntu",
Schema: "docker",
Arg: "//node:18",
},
wantErr: false,
},
{
args: "ubuntu:host",
want: &Label{
Name: "ubuntu",
Schema: "host",
Arg: "",
},
wantErr: false,
},
{
args: "ubuntu",
want: &Label{
Name: "ubuntu",
Schema: "host",
Arg: "",
},
wantErr: false,
},
{
args: "ubuntu:vm:ubuntu-18.04",
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.args, func(t *testing.T) {
got, err := Parse(tt.args)
if tt.wantErr {
require.Error(t, err)
return
} else {
require.NoError(t, err)
}
assert.DeepEqual(t, got, tt.want)
})
}
}

View File

@ -0,0 +1,298 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package report
import (
"context"
"fmt"
"strings"
"sync"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
retry "github.com/avast/retry-go/v4"
"github.com/bufbuild/connect-go"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.com/gitea/act_runner/internal/pkg/client"
)
type Reporter struct {
ctx context.Context
cancel context.CancelFunc
closed bool
client client.Client
clientM sync.Mutex
logOffset int
logRows []*runnerv1.LogRow
logReplacer *strings.Replacer
state *runnerv1.TaskState
stateM sync.RWMutex
}
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter {
var oldnew []string
if v := task.Context.Fields["token"].GetStringValue(); v != "" {
oldnew = append(oldnew, v, "***")
}
for _, v := range task.Secrets {
oldnew = append(oldnew, v, "***")
}
return &Reporter{
ctx: ctx,
cancel: cancel,
client: client,
logReplacer: strings.NewReplacer(oldnew...),
state: &runnerv1.TaskState{
Id: task.Id,
},
}
}
func (r *Reporter) ResetSteps(l int) {
r.stateM.Lock()
defer r.stateM.Unlock()
for i := 0; i < l; i++ {
r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
Id: int64(i),
})
}
}
func (r *Reporter) Levels() []log.Level {
return log.AllLevels
}
func (r *Reporter) Fire(entry *log.Entry) error {
r.stateM.Lock()
defer r.stateM.Unlock()
log.WithFields(entry.Data).Trace(entry.Message)
timestamp := entry.Time
if r.state.StartedAt == nil {
r.state.StartedAt = timestamppb.New(timestamp)
}
stage := entry.Data["stage"]
if stage != "Main" {
if v, ok := entry.Data["jobResult"]; ok {
if jobResult, ok := r.parseResult(v); ok {
r.state.Result = jobResult
r.state.StoppedAt = timestamppb.New(timestamp)
for _, s := range r.state.Steps {
if s.Result == runnerv1.Result_RESULT_UNSPECIFIED {
s.Result = runnerv1.Result_RESULT_CANCELLED
}
}
}
}
if !r.duringSteps() {
r.logRows = append(r.logRows, r.parseLogRow(entry))
}
return nil
}
var step *runnerv1.StepState
if v, ok := entry.Data["stepNumber"]; ok {
if v, ok := v.(int); ok && len(r.state.Steps) > v {
step = r.state.Steps[v]
}
}
if step == nil {
if !r.duringSteps() {
r.logRows = append(r.logRows, r.parseLogRow(entry))
}
return nil
}
if step.StartedAt == nil {
step.StartedAt = timestamppb.New(timestamp)
}
if v, ok := entry.Data["raw_output"]; ok {
if rawOutput, ok := v.(bool); ok && rawOutput {
if step.LogLength == 0 {
step.LogIndex = int64(r.logOffset + len(r.logRows))
}
step.LogLength++
r.logRows = append(r.logRows, r.parseLogRow(entry))
}
} else if !r.duringSteps() {
r.logRows = append(r.logRows, r.parseLogRow(entry))
}
if v, ok := entry.Data["stepResult"]; ok {
if stepResult, ok := r.parseResult(v); ok {
if step.LogLength == 0 {
step.LogIndex = int64(r.logOffset + len(r.logRows))
}
step.Result = stepResult
step.StoppedAt = timestamppb.New(timestamp)
}
}
return nil
}
func (r *Reporter) RunDaemon() {
if r.closed {
return
}
if r.ctx.Err() != nil {
return
}
_ = r.ReportLog(false)
_ = r.ReportState()
time.AfterFunc(time.Second, r.RunDaemon)
}
func (r *Reporter) Logf(format string, a ...interface{}) {
r.stateM.Lock()
defer r.stateM.Unlock()
if !r.duringSteps() {
r.logRows = append(r.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Content: fmt.Sprintf(format, a...),
})
}
}
func (r *Reporter) Close(lastWords string) error {
r.closed = true
r.stateM.Lock()
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
if lastWords == "" {
lastWords = "Early termination"
}
for _, v := range r.state.Steps {
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
v.Result = runnerv1.Result_RESULT_CANCELLED
}
}
r.state.Result = runnerv1.Result_RESULT_FAILURE
r.logRows = append(r.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Content: lastWords,
})
return nil
} else if lastWords != "" {
r.logRows = append(r.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Content: lastWords,
})
}
r.stateM.Unlock()
return retry.Do(func() error {
if err := r.ReportLog(true); err != nil {
return err
}
return r.ReportState()
}, retry.Context(r.ctx))
}
func (r *Reporter) ReportLog(noMore bool) error {
r.clientM.Lock()
defer r.clientM.Unlock()
r.stateM.RLock()
rows := r.logRows
r.stateM.RUnlock()
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
TaskId: r.state.Id,
Index: int64(r.logOffset),
Rows: rows,
NoMore: noMore,
}))
if err != nil {
return err
}
ack := int(resp.Msg.AckIndex)
if ack < r.logOffset {
return fmt.Errorf("submitted logs are lost")
}
r.stateM.Lock()
r.logRows = r.logRows[ack-r.logOffset:]
r.logOffset = ack
r.stateM.Unlock()
if noMore && ack < r.logOffset+len(rows) {
return fmt.Errorf("not all logs are submitted")
}
return nil
}
func (r *Reporter) ReportState() error {
r.clientM.Lock()
defer r.clientM.Unlock()
r.stateM.RLock()
state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateM.RUnlock()
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
State: state,
}))
if err != nil {
return err
}
if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {
r.cancel()
}
return nil
}
func (r *Reporter) duringSteps() bool {
if steps := r.state.Steps; len(steps) == 0 {
return false
} else if first := steps[0]; first.Result == runnerv1.Result_RESULT_UNSPECIFIED && first.LogLength == 0 {
return false
} else if last := steps[len(steps)-1]; last.Result != runnerv1.Result_RESULT_UNSPECIFIED {
return false
}
return true
}
var stringToResult = map[string]runnerv1.Result{
"success": runnerv1.Result_RESULT_SUCCESS,
"failure": runnerv1.Result_RESULT_FAILURE,
"skipped": runnerv1.Result_RESULT_SKIPPED,
"cancelled": runnerv1.Result_RESULT_CANCELLED,
}
func (r *Reporter) parseResult(result interface{}) (runnerv1.Result, bool) {
str := ""
if v, ok := result.(string); ok { // for jobResult
str = v
} else if v, ok := result.(fmt.Stringer); ok { // for stepResult
str = v.String()
}
ret, ok := stringToResult[str]
return ret, ok
}
func (r *Reporter) parseLogRow(entry *log.Entry) *runnerv1.LogRow {
content := strings.TrimRightFunc(entry.Message, func(r rune) bool { return r == '\r' || r == '\n' })
content = r.logReplacer.Replace(content)
return &runnerv1.LogRow{
Time: timestamppb.New(entry.Time),
Content: content,
}
}

View File

@ -0,0 +1,11 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package ver
// go build -ldflags "-X gitea.com/gitea/act_runner/internal/pkg/ver.version=1.2.3"
var version = "dev"
func Version() string {
return version
}