diff --git a/client/client.go b/client/client.go index b314e93..15aec62 100644 --- a/client/client.go +++ b/client/client.go @@ -1,9 +1,8 @@ package client import ( - "context" - - runnerv1 "gitea.com/gitea/proto-go/runner/v1" + "gitea.com/gitea/proto-go/ping/v1/pingv1connect" + "gitea.com/gitea/proto-go/runner/v1/runnerv1connect" ) type Filter struct { @@ -16,21 +15,6 @@ type Filter struct { // A Client manages communication with the runner. type Client interface { - // Ping sends a ping message to the server to test connectivity. - Ping(ctx context.Context, machine string) error - - // Register for new runner. - Register(ctx context.Context, args *runnerv1.RegisterRequest) (*runnerv1.Runner, error) - - // Request requests the next available build stage for execution. - Request(ctx context.Context, args *runnerv1.RequestRequest) (*runnerv1.Stage, error) - - // Detail fetches build details - Detail(ctx context.Context, args *runnerv1.DetailRequest) (*runnerv1.DetailResponse, error) - - // Update updates the build stage. - Update(ctxt context.Context, args *runnerv1.UpdateRequest) error - - // UpdateStep updates the build step. - UpdateStep(ctx context.Context, args *runnerv1.UpdateStepRequest) error + pingv1connect.PingServiceClient + runnerv1connect.RunnerServiceClient } diff --git a/client/http.go b/client/http.go index da88b9d..6acdd98 100644 --- a/client/http.go +++ b/client/http.go @@ -7,9 +7,7 @@ import ( "net/http" "time" - pingv1 "gitea.com/gitea/proto-go/ping/v1" "gitea.com/gitea/proto-go/ping/v1/pingv1connect" - runnerv1 "gitea.com/gitea/proto-go/runner/v1" "gitea.com/gitea/proto-go/runner/v1/runnerv1connect" "github.com/bufbuild/connect-go" @@ -17,34 +15,40 @@ import ( ) // New returns a new runner client. -func New(endpoint, secret string, skipverify bool, opts ...Option) *HTTPClient { - client := &HTTPClient{ - Endpoint: endpoint, - Secret: secret, - SkipVerify: skipverify, - } +func New(endpoint, secret string, opts ...Option) *HTTPClient { + cfg := &config{} // Loop through each option for _, opt := range opts { // Call the option giving the instantiated - opt.Apply(client) + opt.apply(cfg) } - client.Client = &http.Client{ - Timeout: 1 * time.Minute, - CheckRedirect: func(*http.Request, []*http.Request) error { - return http.ErrUseLastResponse - }, - Transport: &http2.Transport{ - AllowHTTP: true, - DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) { - return net.Dial(netw, addr) + interceptor := connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + req.Header().Set("X-Runner-Token", secret) + return next(ctx, req) + } + }) + cfg.opts = append(cfg.opts, connect.WithInterceptors(interceptor)) + + if cfg.httpClient == nil { + cfg.httpClient = &http.Client{ + Timeout: 1 * time.Minute, + CheckRedirect: func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse }, - }, + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(netw, addr) + }, + }, + } } - if skipverify { - client.Client = &http.Client{ + if cfg.skipVerify { + cfg.httpClient = &http.Client{ CheckRedirect: func(*http.Request, []*http.Request) error { return http.ErrUseLastResponse }, @@ -56,119 +60,25 @@ func New(endpoint, secret string, skipverify bool, opts ...Option) *HTTPClient { }, } } - return client + + return &HTTPClient{ + PingServiceClient: pingv1connect.NewPingServiceClient( + cfg.httpClient, + endpoint, + cfg.opts..., + ), + RunnerServiceClient: runnerv1connect.NewRunnerServiceClient( + cfg.httpClient, + endpoint, + cfg.opts..., + ), + } } var _ Client = (*HTTPClient)(nil) // An HTTPClient manages communication with the runner API. type HTTPClient struct { - Client *http.Client - Endpoint string - Secret string - SkipVerify bool - - opts []connect.ClientOption -} - -// Ping sends a ping message to the server to test connectivity. -func (p *HTTPClient) Ping(ctx context.Context, machine string) error { - client := pingv1connect.NewPingServiceClient( - p.Client, - p.Endpoint, - p.opts..., - ) - req := connect.NewRequest(&pingv1.PingRequest{ - Data: machine, - }) - - req.Header().Set("X-Runner-Token", p.Secret) - - _, err := client.Ping(ctx, req) - return err -} - -// Register a new runner. -func (p *HTTPClient) Register(ctx context.Context, arg *runnerv1.RegisterRequest) (*runnerv1.Runner, error) { - client := runnerv1connect.NewRunnerServiceClient( - p.Client, - p.Endpoint, - p.opts..., - ) - req := connect.NewRequest(arg) - req.Header().Set("X-Runner-Token", p.Secret) - - res, err := client.Register(ctx, req) - if err != nil { - return nil, err - } - - return res.Msg.Runner, err -} - -// Request requests the next available build stage for execution. -func (p *HTTPClient) Request(ctx context.Context, arg *runnerv1.RequestRequest) (*runnerv1.Stage, error) { - client := runnerv1connect.NewRunnerServiceClient( - p.Client, - p.Endpoint, - p.opts..., - ) - req := connect.NewRequest(arg) - req.Header().Set("X-Runner-Token", p.Secret) - - res, err := client.Request(ctx, req) - if err != nil { - return nil, err - } - - return res.Msg.Stage, err -} - -// Update updates the build stage. -func (p *HTTPClient) Update(ctx context.Context, arg *runnerv1.UpdateRequest) error { - client := runnerv1connect.NewRunnerServiceClient( - p.Client, - p.Endpoint, - p.opts..., - ) - req := connect.NewRequest(arg) - req.Header().Set("X-Runner-Token", p.Secret) - - _, err := client.Update(ctx, req) - - return err -} - -// UpdateStep updates the build step. -func (p *HTTPClient) UpdateStep(ctx context.Context, arg *runnerv1.UpdateStepRequest) error { - client := runnerv1connect.NewRunnerServiceClient( - p.Client, - p.Endpoint, - p.opts..., - ) - req := connect.NewRequest(arg) - req.Header().Set("X-Runner-Token", p.Secret) - - _, err := client.UpdateStep(ctx, req) - - return err -} - -// Detail fetches build details -func (p *HTTPClient) Detail(ctx context.Context, arg *runnerv1.DetailRequest) (*runnerv1.DetailResponse, error) { - client := runnerv1connect.NewRunnerServiceClient( - p.Client, - p.Endpoint, - p.opts..., - ) - req := connect.NewRequest(arg) - req.Header().Set("X-Runner-Token", p.Secret) - - resp, err := client.Detail(ctx, req) - - return &runnerv1.DetailResponse{ - Repo: resp.Msg.Repo, - Build: resp.Msg.Build, - Stage: resp.Msg.Stage, - }, err + pingv1connect.PingServiceClient + runnerv1connect.RunnerServiceClient } diff --git a/client/options.go b/client/options.go index f45d7ab..bd529ac 100644 --- a/client/options.go +++ b/client/options.go @@ -1,36 +1,58 @@ package client -import "github.com/bufbuild/connect-go" +import ( + "net/http" + + "github.com/bufbuild/connect-go" +) + +type config struct { + httpClient *http.Client + skipVerify bool + opts []connect.ClientOption +} // An Option configures a mutex. type Option interface { - Apply(*HTTPClient) + apply(*config) } // OptionFunc is a function that configure a value. -type OptionFunc func(*HTTPClient) +type OptionFunc func(*config) // Apply calls f(option) -func (f OptionFunc) Apply(cli *HTTPClient) { - f(cli) +func (f OptionFunc) apply(cfg *config) { + f(cfg) +} + +func WithSkipVerify(c bool) Option { + return OptionFunc(func(cfg *config) { + cfg.skipVerify = c + }) +} + +func WithClientOptions(opts ...connect.ClientOption) Option { + return OptionFunc(func(cfg *config) { + cfg.opts = append(cfg.opts, opts...) + }) } // WithGRPC configures clients to use the HTTP/2 gRPC protocol. func WithGRPC(c bool) Option { - return OptionFunc(func(cli *HTTPClient) { + return OptionFunc(func(cfg *config) { if !c { return } - cli.opts = append(cli.opts, connect.WithGRPC()) + cfg.opts = append(cfg.opts, connect.WithGRPC()) }) } // WithGRPCWeb configures clients to use the gRPC-Web protocol. func WithGRPCWeb(c bool) Option { - return OptionFunc(func(cli *HTTPClient) { + return OptionFunc(func(cfg *config) { if !c { return } - cli.opts = append(cli.opts, connect.WithGRPCWeb()) + cfg.opts = append(cfg.opts, connect.WithGRPCWeb()) }) } diff --git a/cmd/config.go b/cmd/config.go index 0b266f9..4de3e48 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -20,7 +20,7 @@ type ( Host string `envconfig:"GITEA_RPC_HOST" required:"true"` Secret string `envconfig:"GITEA_RPC_SECRET" required:"true"` SkipVerify bool `envconfig:"GITEA_RPC_SKIP_VERIFY"` - GRPC bool `envconfig:"GITEA_RPC_GRPC"` + GRPC bool `envconfig:"GITEA_RPC_GRPC" default:"true"` GRPCWeb bool `envconfig:"GITEA_RPC_GRPC_WEB"` } diff --git a/cmd/daemon.go b/cmd/daemon.go index f7b6198..0dd8cf8 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -9,6 +9,8 @@ import ( "gitea.com/gitea/act_runner/poller" "gitea.com/gitea/act_runner/runtime" + pingv1 "gitea.com/gitea/proto-go/ping/v1" + "github.com/bufbuild/connect-go" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -37,13 +39,15 @@ func runDaemon(ctx context.Context, task *runtime.Task) func(cmd *cobra.Command, cli := client.New( cfg.Client.Address, cfg.Client.Secret, - cfg.Client.SkipVerify, + client.WithSkipVerify(cfg.Client.SkipVerify), client.WithGRPC(cfg.Client.GRPC), client.WithGRPCWeb(cfg.Client.GRPCWeb), ) for { - err := cli.Ping(ctx, cfg.Runner.Name) + _, err := cli.Ping(ctx, connect.NewRequest(&pingv1.PingRequest{ + Data: cfg.Runner.Name, + })) select { case <-ctx.Done(): return nil diff --git a/go.mod b/go.mod index 5002e23..03564de 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,19 @@ module gitea.com/gitea/act_runner go 1.18 require ( - gitea.com/gitea/proto-go v0.0.0-20220903092234-20f71c2df67e - github.com/bufbuild/connect-go v0.3.0 + gitea.com/gitea/proto-go v0.0.0-20220925101213-1ac8a05257e1 + github.com/avast/retry-go/v4 v4.1.0 + github.com/bufbuild/connect-go v0.5.0 github.com/docker/docker v20.10.17+incompatible github.com/joho/godotenv v1.4.0 github.com/kelseyhightower/envconfig v1.4.0 - github.com/mattn/go-isatty v0.0.14 + github.com/mattn/go-isatty v0.0.16 github.com/nektos/act v0.2.26 github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.5.0 golang.org/x/net v0.0.0-20220403103023-749bd193bc2b - golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 + golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde + google.golang.org/protobuf v1.28.1 ) require ( @@ -34,7 +36,7 @@ require ( github.com/go-git/gcfg v1.5.0 // indirect github.com/go-git/go-billy/v5 v5.3.1 // indirect github.com/go-git/go-git/v5 v5.4.2 // indirect - github.com/go-ini/ini v1.66.6 // indirect + github.com/go-ini/ini v1.67.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/imdario/mergo v0.3.12 // indirect @@ -43,10 +45,10 @@ require ( github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/mattn/go-colorable v0.1.12 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect - github.com/moby/buildkit v0.10.3 // indirect + github.com/moby/buildkit v0.10.4 // indirect github.com/moby/sys/mount v0.3.1 // indirect github.com/moby/sys/mountinfo v0.6.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect @@ -54,20 +56,19 @@ require ( github.com/opencontainers/runc v1.1.2 // indirect github.com/opencontainers/selinux v1.10.1 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/rhysd/actionlint v1.6.15 // indirect - github.com/rivo/uniseg v0.2.0 // indirect + github.com/rhysd/actionlint v1.6.17 // indirect + github.com/rivo/uniseg v0.3.4 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/sergi/go-diff v1.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/xanzy/ssh-agent v0.3.1 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + golang.org/x/sys v0.0.0-20220818161305-2296e01440c6 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/nektos/act v0.2.26 => gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee +replace github.com/nektos/act => gitea.com/gitea/act v0.0.0-20220922135643-52a5bba9e7fa diff --git a/go.sum b/go.sum index 7190a45..ef85cc7 100644 --- a/go.sum +++ b/go.sum @@ -23,10 +23,10 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee h1:T0wftx4RaYqbTH4t0A7bXGXxemZloCrjReA7xJvIVdY= -gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee/go.mod h1:G37Vfz4J6kJ5NbcPI5xQUkeWPVkUCP5J+MFkaWU9jNY= -gitea.com/gitea/proto-go v0.0.0-20220903092234-20f71c2df67e h1:xlNITjAs+Ce6QR4mo0BvHTzTdkpqgS7zwfLpEi31mIM= -gitea.com/gitea/proto-go v0.0.0-20220903092234-20f71c2df67e/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= +gitea.com/gitea/act v0.0.0-20220922135643-52a5bba9e7fa h1:HHqlvfIvqFlny3sgJgAM1BYeLNr1uM4yXtvF7aAoYK8= +gitea.com/gitea/act v0.0.0-20220922135643-52a5bba9e7fa/go.mod h1:9W/Nz16tjfnWp7O5DUo3EjZBnZFBI/5rlWstX4o7+hU= +gitea.com/gitea/proto-go v0.0.0-20220925101213-1ac8a05257e1 h1:JGApntYc07jawNxrxv1WhU6IHX0i73nqhloZlaUR5Nc= +gitea.com/gitea/proto-go v0.0.0-20220925101213-1ac8a05257e1/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= @@ -91,6 +91,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= +github.com/avast/retry-go/v4 v4.1.0 h1:CwudD9anYv6JMVnDuTRlK6kLo4dBamiL+F3U8YDiyfg= +github.com/avast/retry-go/v4 v4.1.0/go.mod h1:HqmLvS2VLdStPCGDFjSuZ9pzlTqVRldCI4w2dO4m1Ms= github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -103,8 +105,8 @@ github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnweb github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk= -github.com/bufbuild/connect-go v0.3.0 h1:B0vyZrTR11SNEYpodL6P0NzluDCsuqmr8CNKblXvVto= -github.com/bufbuild/connect-go v0.3.0/go.mod h1:4efZ2eXFENwd4p7tuLaL9m0qtTsCOzuBvrohvRGevDM= +github.com/bufbuild/connect-go v0.5.0 h1:JFbWPWpasBqzM5h/awoRhAXmLERZQlQ5xTn42uf+5ao= +github.com/bufbuild/connect-go v0.5.0/go.mod h1:ZEtBnQ7J/m7bvWOW+H8T/+hKQCzPVfhhhICuvtcnjlI= github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= @@ -321,8 +323,8 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= -github.com/go-ini/ini v1.66.6 h1:h6k2Bb0HWS/BXXHCXj4QHjxPmlIU4NK+7MuLp9SD+4k= -github.com/go-ini/ini v1.66.6/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= @@ -505,12 +507,13 @@ github.com/matryer/is v1.2.0 h1:92UTHpy8CDwaJ08GqLDzhhuixiBUUD1p3AU6PHddz4A= github.com/matryer/is v1.2.0/go.mod h1:2fLPjFQM9rhQ15aVEtbuwhJinnOqrmgXPNdZsdwlWXA= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= -github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= @@ -525,8 +528,8 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A= -github.com/moby/buildkit v0.10.3 h1:/dGykD8FW+H4p++q5+KqKEo6gAkYKyBQHdawdjVwVAU= -github.com/moby/buildkit v0.10.3/go.mod h1:jxeOuly98l9gWHai0Ojrbnczrk/rf+o9/JqNhY+UCSo= +github.com/moby/buildkit v0.10.4 h1:FvC+buO8isGpUFZ1abdSLdGHZVqg9sqI4BbFL8tlzP4= +github.com/moby/buildkit v0.10.4/go.mod h1:Yajz9vt1Zw5q9Pp4pdb3TCSUXJBIroIQGQ3TTs/sLug= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= github.com/moby/sys/mount v0.3.1 h1:RX1K0x95oR8j5P1YefKDt7tE1C2kCCixV0H8Aza3GaI= github.com/moby/sys/mount v0.3.1/go.mod h1:6IZknFQiqjLpwuYJD5Zk0qYEuJiws36M88MIXnZHya0= @@ -604,6 +607,7 @@ github.com/opencontainers/selinux v1.10.1/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuh github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0/go.mod h1:4xpMLz7RBWyB+ElzHu8Llua96TRCB3YwX+l5EP1wmHk= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -641,10 +645,12 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rhysd/actionlint v1.6.15 h1:IxQIp10aVce77jNnoHye7NFka8/7CRBSvKXoMRGryXM= -github.com/rhysd/actionlint v1.6.15/go.mod h1:R4ZRjgsIrnsT1CPU/4MdiIBzfJgMKJFd4qqGUERI098= -github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rhysd/actionlint v1.6.17 h1:U6Idx7mLvWFBnaL8tm8oye0fdBUH9ehgoxG25bBy9p8= +github.com/rhysd/actionlint v1.6.17/go.mod h1:fCaKErUfJqiyFdeUp3858hULCCupHWlPRB3RhIHS6pY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.3.4 h1:3Z3Eu6FGHZWSfNKJTOUiPatWwfc7DzJRU04jFUqJODw= +github.com/rivo/uniseg v0.3.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -736,6 +742,7 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1: github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs= github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA= github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= @@ -845,6 +852,7 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210326060303-6b1517762897/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220403103023-749bd193bc2b h1:vI32FkLJNAWtGD4BwkThwEy6XS7ZLLMHkSkYfF8M0W0= golang.org/x/net v0.0.0-20220403103023-749bd193bc2b/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -864,8 +872,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= -golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -938,13 +946,16 @@ golang.org/x/sys v0.0.0-20210502180810-71e4cd670f79/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211107104306-e0b2ad06fe42/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220818161305-2296e01440c6 h1:Sx/u41w+OwrInGdEckYmEuU5gHoGSL4QbDz3S9s6j4U= +golang.org/x/sys v0.0.0-20220818161305-2296e01440c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1009,6 +1020,7 @@ golang.org/x/tools v0.0.0-20200916195026-c9a70fc28ce3/go.mod h1:z6u4i615ZeAfBE4X golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/poller/poller.go b/poller/poller.go index 90e1add..77132ed 100644 --- a/poller/poller.go +++ b/poller/poller.go @@ -7,10 +7,11 @@ import ( "gitea.com/gitea/act_runner/client" runnerv1 "gitea.com/gitea/proto-go/runner/v1" + "github.com/bufbuild/connect-go" log "github.com/sirupsen/logrus" ) -func New(cli client.Client, dispatch func(context.Context, *runnerv1.Stage) error, filter *client.Filter) *Poller { +func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error, filter *client.Filter) *Poller { return &Poller{ Client: cli, Filter: filter, @@ -22,18 +23,18 @@ func New(cli client.Client, dispatch func(context.Context, *runnerv1.Stage) erro type Poller struct { Client client.Client Filter *client.Filter - Dispatch func(context.Context, *runnerv1.Stage) error + Dispatch func(context.Context, *runnerv1.Task) error routineGroup *routineGroup } func (p *Poller) Poll(ctx context.Context, n int) error { // register new runner. - _, err := p.Client.Register(ctx, &runnerv1.RegisterRequest{ + _, err := p.Client.Register(ctx, connect.NewRequest(&runnerv1.RegisterRequest{ Os: p.Filter.OS, Arch: p.Filter.Arch, Capacity: int64(p.Filter.Capacity), - }) + })) if err != nil { log.WithError(err).Error("poller: cannot register new runner") return err @@ -74,12 +75,12 @@ func (p *Poller) poll(ctx context.Context, thread int) error { // request a new build stage for execution from the central // build server. - stage, err := p.Client.Request(ctx, &runnerv1.RequestRequest{ + resp, err := p.Client.Request(ctx, connect.NewRequest(&runnerv1.RequestRequest{ Kind: p.Filter.Kind, Os: p.Filter.OS, Arch: p.Filter.Arch, Type: p.Filter.Type, - }) + })) if err == context.Canceled || err == context.DeadlineExceeded { logger.WithError(err).Trace("poller: no stage returned") return nil @@ -90,9 +91,9 @@ func (p *Poller) poll(ctx context.Context, thread int) error { // exit if a nil or empty stage is returned from the system // and allow the runner to retry. - if stage == nil || stage.Id == 0 { + if resp.Msg.Task == nil || resp.Msg.Task.Id == 0 { return nil } - return p.Dispatch(ctx, stage) + return p.Dispatch(ctx, resp.Msg.Task) } diff --git a/runtime/reporter.go b/runtime/reporter.go new file mode 100644 index 0000000..b530f85 --- /dev/null +++ b/runtime/reporter.go @@ -0,0 +1,263 @@ +package runtime + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "gitea.com/gitea/act_runner/client" + runnerv1 "gitea.com/gitea/proto-go/runner/v1" + + "github.com/avast/retry-go/v4" + "github.com/bufbuild/connect-go" + log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type Reporter struct { + ctx context.Context + closed bool + client client.Client + clientM sync.Mutex + + logOffset int + logRows []*runnerv1.LogRow + state *runnerv1.TaskState + stateM sync.RWMutex +} + +func NewReporter(ctx context.Context, client client.Client, taskID int64) *Reporter { + return &Reporter{ + ctx: ctx, + client: client, + state: &runnerv1.TaskState{ + Id: taskID, + }, + } +} + +func (r *Reporter) ResetSteps(l int) { + r.stateM.Lock() + defer r.stateM.Unlock() + for i := 0; i < l; i++ { + r.state.Steps = append(r.state.Steps, &runnerv1.StepState{ + Id: int64(i), + }) + } +} + +func (r *Reporter) Levels() []log.Level { + return log.AllLevels +} + +func (r *Reporter) Fire(entry *log.Entry) error { + r.stateM.Lock() + defer r.stateM.Unlock() + + timestamp := entry.Time + if r.state.StartedAt == nil { + r.state.StartedAt = timestamppb.New(timestamp) + } + + var step *runnerv1.StepState + if v, ok := entry.Data["stepNumber"]; ok { + if v, ok := v.(int); ok { + step = r.state.Steps[v] + } + } + + if step == nil { + if v, ok := entry.Data["jobResult"]; ok { + if v, ok := v.(string); ok { + if jobResult := r.parseResult(v); jobResult != runnerv1.Result_RESULT_UNSPECIFIED { + r.state.Result = jobResult + r.state.StoppedAt = timestamppb.New(timestamp) + for _, s := range r.state.Steps { + if s.Result == runnerv1.Result_RESULT_UNSPECIFIED { + s.Result = runnerv1.Result_RESULT_CANCELLED + } + } + } + } + } + if !r.duringSteps() { + r.logRows = append(r.logRows, r.parseLogRow(entry)) + } + return nil + } + + if step.StartedAt == nil { + step.StartedAt = timestamppb.New(timestamp) + } + + if v, ok := entry.Data["raw_output"]; ok { + if rawOutput, ok := v.(bool); ok && rawOutput { + if step.LogLength == 0 { + step.LogIndex = int64(r.logOffset + len(r.logRows)) + } + step.LogLength++ + r.logRows = append(r.logRows, r.parseLogRow(entry)) + return nil + } + } + + log.Info(entry.Data) + + if v, ok := entry.Data["stepResult"]; ok { + if v, ok := v.(string); ok { + if stepResult := r.parseResult(v); stepResult != runnerv1.Result_RESULT_UNSPECIFIED { + step.Result = stepResult + step.StoppedAt = timestamppb.New(timestamp) + } + } + } + + return nil +} + +func (r *Reporter) RunDaemon() { + if r.closed { + return + } + if r.ctx.Err() != nil { + return + } + + _ = r.ReportLog(false) + _ = r.ReportState() + + time.AfterFunc(time.Second, r.RunDaemon) +} + +func (r *Reporter) Logf(format string, a ...interface{}) { + r.stateM.Lock() + defer r.stateM.Unlock() + + if !r.duringSteps() { + r.logRows = append(r.logRows, &runnerv1.LogRow{ + Time: timestamppb.Now(), + Content: fmt.Sprintf(format, a...), + }) + } +} + +func (r *Reporter) Close(lastWords string) error { + r.closed = true + + r.stateM.Lock() + if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { + if lastWords == "" { + lastWords = "Early termination" + } + for _, v := range r.state.Steps { + if v.Result == runnerv1.Result_RESULT_UNSPECIFIED { + v.Result = runnerv1.Result_RESULT_CANCELLED + } + } + r.logRows = append(r.logRows, &runnerv1.LogRow{ + Time: timestamppb.Now(), + Content: lastWords, + }) + return nil + } else if lastWords != "" { + r.logRows = append(r.logRows, &runnerv1.LogRow{ + Time: timestamppb.Now(), + Content: lastWords, + }) + } + r.stateM.Unlock() + + if err := retry.Do(func() error { + if err := r.ReportLog(true); err != nil { + return err + } + return r.ReportState() + }, retry.Context(r.ctx)); err != nil { + return err + } + return nil +} + +func (r *Reporter) ReportLog(noMore bool) error { + r.clientM.Lock() + defer r.clientM.Unlock() + + r.stateM.RLock() + rows := r.logRows + r.stateM.RUnlock() + + resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{ + TaskId: r.state.Id, + Index: int64(r.logOffset), + Rows: rows, + NoMore: noMore, + })) + if err != nil { + return err + } + + ack := int(resp.Msg.AckIndex) + if ack < r.logOffset { + return fmt.Errorf("submitted logs are lost") + } + + r.stateM.Lock() + r.logRows = r.logRows[ack-r.logOffset:] + r.logOffset = ack + r.stateM.Unlock() + + if noMore && ack < r.logOffset+len(rows) { + return fmt.Errorf("not all logs are submitted") + } + + return nil +} + +func (r *Reporter) ReportState() error { + r.clientM.Lock() + defer r.clientM.Unlock() + + r.stateM.RLock() + state := proto.Clone(r.state).(*runnerv1.TaskState) + r.stateM.RUnlock() + + _, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ + State: state, + })) + return err +} + +func (r *Reporter) duringSteps() bool { + if steps := r.state.Steps; len(steps) == 0 { + return false + } else if first := steps[0]; first.Result == runnerv1.Result_RESULT_UNSPECIFIED && first.LogLength == 0 { + return false + } else if last := steps[len(steps)-1]; last.Result != runnerv1.Result_RESULT_UNSPECIFIED { + return false + } + return true +} + +func (r *Reporter) parseResult(s string) runnerv1.Result { + switch s { + case "success": + return runnerv1.Result_RESULT_SUCCESS + case "failure": + return runnerv1.Result_RESULT_FAILURE + case "skipped": + return runnerv1.Result_RESULT_SKIPPED + case "cancelled": + return runnerv1.Result_RESULT_CANCELLED + } + return runnerv1.Result_RESULT_UNSPECIFIED +} + +func (r *Reporter) parseLogRow(entry *log.Entry) *runnerv1.LogRow { + return &runnerv1.LogRow{ + Time: timestamppb.New(entry.Time), + Content: strings.TrimSuffix(entry.Message, "\r\n"), + } +} diff --git a/runtime/runtime.go b/runtime/runtime.go index f9d7977..86484f2 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -7,6 +7,7 @@ import ( "gitea.com/gitea/act_runner/client" runnerv1 "gitea.com/gitea/proto-go/runner/v1" + "github.com/bufbuild/connect-go" log "github.com/sirupsen/logrus" ) @@ -26,37 +27,31 @@ type Runner struct { } // Run runs the pipeline stage. -func (s *Runner) Run(ctx context.Context, stage *runnerv1.Stage) error { +func (s *Runner) Run(ctx context.Context, task *runnerv1.Task) error { l := log. - WithField("stage.id", stage.Id). - WithField("stage.name", stage.Name) + WithField("task.id", task.Id) l.Info("start running pipeline") // update machine in stage - stage.Machine = s.Machine - data, err := s.Client.Detail(ctx, &runnerv1.DetailRequest{ - Stage: stage, - }) + task.Machine = s.Machine + data, err := s.Client.Detail(ctx, connect.NewRequest(&runnerv1.DetailRequest{ + Task: task, + })) if err != nil && err == ErrDataLock { - l.Info("stage accepted by another runner") + l.Info("task accepted by another runner") return nil } if err != nil { - l.WithError(err).Error("cannot accept stage") + l.WithError(err).Error("cannot accept task") return err } - l = log.WithField("repo.id", data.Repo.Id). - WithField("repo.name", data.Repo.Name). - WithField("build.id", data.Build.Id). - WithField("build.name", data.Build.Name) + l.Info("task details fetched") - l.Info("stage details fetched") - - return s.run(ctx, data) + return s.run(ctx, data.Msg.Task) } -func (s *Runner) run(ctx context.Context, data *runnerv1.DetailResponse) error { - return NewTask(data.Build.Id, s.Client).Run(ctx, data) +func (s *Runner) run(ctx context.Context, task *runnerv1.Task) error { + return NewTask(task.Id, s.Client).Run(ctx, task) } diff --git a/runtime/task.go b/runtime/task.go index 13764be..15b27f9 100644 --- a/runtime/task.go +++ b/runtime/task.go @@ -1,12 +1,12 @@ package runtime import ( + "bytes" "context" + "encoding/json" "fmt" "os" "path/filepath" - "sync" - "time" "gitea.com/gitea/act_runner/client" runnerv1 "gitea.com/gitea/proto-go/runner/v1" @@ -57,38 +57,6 @@ type TaskInput struct { EnvFile string } -type taskLogHook struct { - entries []*log.Entry - lock sync.Mutex -} - -func (h *taskLogHook) Levels() []log.Level { - return log.AllLevels -} - -func (h *taskLogHook) Fire(entry *log.Entry) error { - if flag, ok := entry.Data["raw_output"]; ok { - h.lock.Lock() - if flagVal, ok := flag.(bool); flagVal && ok { - log.Infof("task log: %s", entry.Message) - h.entries = append(h.entries, entry) - } - h.lock.Unlock() - } - return nil -} - -func (h *taskLogHook) swapLogs() []*log.Entry { - if len(h.entries) == 0 { - return nil - } - h.lock.Lock() - entries := h.entries - h.entries = nil - h.lock.Unlock() - return entries -} - type TaskState int const ( @@ -112,13 +80,12 @@ type Task struct { BuildID int64 Input *TaskInput - logHook *taskLogHook - state TaskState - client client.Client - log *log.Entry + state TaskState + client client.Client + log *log.Entry } -// newTask creates a new task +// NewTask creates a new task func NewTask(buildID int64, client client.Client) *Task { task := &Task{ Input: &TaskInput{ @@ -127,10 +94,9 @@ func NewTask(buildID int64, client client.Client) *Task { }, BuildID: buildID, - state: TaskStatePending, - client: client, - log: log.WithField("buildID", buildID), - logHook: &taskLogHook{}, + state: TaskStatePending, + client: client, + log: log.WithField("buildID", buildID), } task.Input.repoDirectory, _ = os.Getwd() return task @@ -157,139 +123,96 @@ func demoPlatforms() map[string]string { } } -// reportFailure reports the failure of the task -func (t *Task) reportFailure(ctx context.Context, err error) { - t.state = TaskStateFailure - finishTask(t.BuildID) - - t.log.Errorf("task failed: %v", err) - - if t.client == nil { - // TODO: fill the step request - stepRequest := &runnerv1.UpdateStepRequest{} - _ = t.client.UpdateStep(ctx, stepRequest) - return - } -} - -func (t *Task) startReporting(ctx context.Context, interval int64) { - for { - time.Sleep(time.Duration(interval) * time.Second) - if t.state == TaskStateSuccess || t.state == TaskStateFailure { - t.log.Debugf("task reporting stopped") - break - } - t.reportStep(ctx) - } -} - -// reportStep reports the step of the task -func (t *Task) reportStep(ctx context.Context) { - if t.client == nil { - return - } - logValues := t.logHook.swapLogs() - if len(logValues) == 0 { - t.log.Debugf("no log to report") - return - } - t.log.Infof("reporting %d logs", len(logValues)) - - // TODO: fill the step request - stepRequest := &runnerv1.UpdateStepRequest{} - _ = t.client.UpdateStep(ctx, stepRequest) -} - -// reportSuccess reports the success of the task -func (t *Task) reportSuccess(ctx context.Context) { - t.state = TaskStateSuccess - finishTask(t.BuildID) - - t.log.Infof("task success") - - if t.client == nil { - return - } - - // TODO: fill the step request - stepRequest := &runnerv1.UpdateStepRequest{} - _ = t.client.UpdateStep(ctx, stepRequest) -} - -func (t *Task) Run(ctx context.Context, data *runnerv1.DetailResponse) error { - _, exist := globalTaskMap.Load(data.Build.Id) +func (t *Task) Run(ctx context.Context, task *runnerv1.Task) error { + _, exist := globalTaskMap.Load(task.Id) if exist { - return fmt.Errorf("task %d already exists", data.Build.Id) + return fmt.Errorf("task %d already exists", task.Id) } // set task ve to global map // when task is done or canceled, it will be removed from the map - globalTaskMap.Store(data.Build.Id, t) + globalTaskMap.Store(task.Id, t) + defer globalTaskMap.Delete(task.Id) + + lastWords := "" + reporter := NewReporter(ctx, t.client, task.Id) + defer func() { + _ = reporter.Close(lastWords) + }() + reporter.RunDaemon() + + reporter.Logf("received task %v of job %v", task.Id, task.Context.Fields["job"].GetStringValue()) workflowsPath, err := getWorkflowsPath(t.Input.repoDirectory) if err != nil { - t.reportFailure(ctx, err) + lastWords = err.Error() return err } t.log.Debugf("workflows path: %s", workflowsPath) - planner, err := model.NewWorkflowPlanner(workflowsPath, false) + workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload)) if err != nil { - t.reportFailure(ctx, err) + lastWords = err.Error() return err } - var eventName string - events := planner.GetEvents() - if len(events) > 0 { - // set default event type to first event - // this way user dont have to specify the event. - t.log.Debugf("Using detected workflow event: %s", events[0]) - eventName = events[0] + var plan *model.Plan + if jobIDs := workflow.GetJobIDs(); len(jobIDs) != 1 { + err := fmt.Errorf("multiple jobs fould: %v", jobIDs) + lastWords = err.Error() + return err } else { - if plan := planner.PlanEvent("push"); plan != nil { - eventName = "push" - } + jobID := jobIDs[0] + plan = model.CombineWorkflowPlanner(workflow).PlanJob(jobID) + + job := workflow.GetJob(jobID) + reporter.ResetSteps(len(job.Steps)) } - // build the plan for this run - var plan *model.Plan - jobID := "" - if t.BuildID > 0 { - jobID = fmt.Sprintf("%d", t.BuildID) - } - if jobID != "" { - t.log.Infof("Planning job: %s", jobID) - plan = planner.PlanJob(jobID) - } else { - t.log.Infof("Planning event: %s", eventName) - plan = planner.PlanEvent(eventName) - } + log.Infof("plan: %+v", plan.Stages[0].Runs) curDir, err := os.Getwd() if err != nil { - t.reportFailure(ctx, err) + lastWords = err.Error() + return err + } + + dataContext := task.Context.Fields + preset := &model.GithubContext{ + Event: dataContext["event"].GetStructValue().AsMap(), + RunID: dataContext["run_id"].GetStringValue(), + RunNumber: dataContext["run_number"].GetStringValue(), + Actor: dataContext["actor"].GetStringValue(), + Repository: dataContext["repository"].GetStringValue(), + EventName: dataContext["event_name"].GetStringValue(), + Sha: dataContext["sha"].GetStringValue(), + Ref: dataContext["ref"].GetStringValue(), + RefName: dataContext["ref_name"].GetStringValue(), + RefType: dataContext["ref_type"].GetStringValue(), + HeadRef: dataContext["head_ref"].GetStringValue(), + BaseRef: dataContext["base_ref"].GetStringValue(), + Token: dataContext["token"].GetStringValue(), + RepositoryOwner: dataContext["repository_owner"].GetStringValue(), + RetentionDays: dataContext["retention_days"].GetStringValue(), + } + eventJSON, err := json.Marshal(preset.Event) + if err != nil { + lastWords = err.Error() return err } - // run the plan input := t.Input config := &runner.Config{ - Actor: input.actor, - EventName: eventName, - EventPath: "", - DefaultBranch: "", - ForcePull: input.forcePull, - ForceRebuild: input.forceRebuild, - ReuseContainers: input.reuseContainers, - Workdir: curDir, - BindWorkdir: input.bindWorkdir, - LogOutput: true, - JSONLogger: input.jsonLogger, - // Env: envs, - // Secrets: secrets, + Workdir: curDir, // TODO: temp dir? + BindWorkdir: input.bindWorkdir, + ReuseContainers: input.reuseContainers, + ForcePull: input.forcePull, + ForceRebuild: input.forceRebuild, + LogOutput: true, + JSONLogger: input.jsonLogger, + Secrets: task.Secrets, InsecureSecrets: input.insecureSecrets, - Platforms: demoPlatforms(), + Platforms: demoPlatforms(), // TODO: supported platforms Privileged: input.privileged, UsernsMode: input.usernsMode, ContainerArchitecture: input.containerArchitecture, @@ -302,11 +225,12 @@ func (t *Task) Run(ctx context.Context, data *runnerv1.DetailResponse) error { ArtifactServerPath: input.artifactServerPath, ArtifactServerPort: input.artifactServerPort, NoSkipCheckout: input.noSkipCheckout, - // RemoteName: input.remoteName, + PresetGitHubContext: preset, + EventJSON: string(eventJSON), } r, err := runner.New(config) if err != nil { - t.reportFailure(ctx, err) + lastWords = err.Error() return err } @@ -319,17 +243,15 @@ func (t *Task) Run(ctx context.Context, data *runnerv1.DetailResponse) error { }) t.log.Infof("workflow prepared") + reporter.Logf("workflow prepared") // add logger recorders - ctx = common.WithLoggerHook(ctx, t.logHook) - - go t.startReporting(ctx, 1) + ctx = common.WithLoggerHook(ctx, reporter) if err := executor(ctx); err != nil { - t.reportFailure(ctx, err) + lastWords = err.Error() return err } - t.reportSuccess(ctx) return nil }