From 2babadbc9492e15192e7fb4347fdbac8d86cc7e3 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 2 May 2022 17:02:51 +0800 Subject: [PATCH] Add websocket --- .gitea/workflows/lint.yml | 19 +++-- cmd/damon.go | 140 +++++++++++++++++++++++++++++++ cmd/root.go | 172 ++++++++++++++++++++------------------ go.mod | 1 + go.sum | 1 + 5 files changed, 244 insertions(+), 89 deletions(-) create mode 100644 cmd/damon.go diff --git a/.gitea/workflows/lint.yml b/.gitea/workflows/lint.yml index 6d08d12..97e5326 100644 --- a/.gitea/workflows/lint.yml +++ b/.gitea/workflows/lint.yml @@ -2,19 +2,20 @@ name: checks on: [push] env: - ACT_OWNER: ${{ github.repository_owner }} - ACT_REPOSITORY: ${{ github.repository }} - GO_VERSION: 1.18 - CGO_ENABLED: 0 + GOPROXY: https://goproxy.io,direct jobs: lint: name: lint runs-on: ubuntu-latest steps: + - uses: actions/setup-go@v3 + with: + go-version: 1.17 - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - uses: golangci/golangci-lint-action@v3.1.0 - with: - version: latest \ No newline at end of file + - uses: Jerome1337/golint-action@v1.0.2 + #- name: golangci-lint + # uses: golangci/golangci-lint-action@v3 + # with: + # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version + # version: v1.29 \ No newline at end of file diff --git a/cmd/damon.go b/cmd/damon.go new file mode 100644 index 0000000..5c15176 --- /dev/null +++ b/cmd/damon.go @@ -0,0 +1,140 @@ +package cmd + +import ( + "context" + "encoding/json" + "errors" + "strings" + "time" + + "github.com/gorilla/websocket" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" +) + +type Message struct { + Version int // + Type int // message type, 1 register 2 error + RunnerUUID string // runner uuid + ErrCode int // error code + ErrContent string // errors message + EventName string + EventPayload string +} + +func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error { + log.Info().Msgf("Starting runner daemon") + + return func(cmd *cobra.Command, args []string) error { + var conn *websocket.Conn + var err error + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var failedCnt int + for { + select { + case <-ctx.Done(): + if conn != nil { + err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Error().Msgf("write close: %v", err) + } + } + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + return ctx.Err() + case <-ticker.C: + if conn == nil { + log.Trace().Msgf("trying connect %v", "ws://localhost:3000/api/actions") + conn, _, err = websocket.DefaultDialer.DialContext(ctx, "ws://localhost:3000/api/actions", nil) + if err != nil { + log.Error().Msgf("dial: %v", err) + break + } + + // register the client + msg := Message{ + Version: 1, + Type: 1, + RunnerUUID: "111111", + } + bs, err := json.Marshal(&msg) + if err != nil { + log.Error().Msgf("Marshal: %v", err) + break + } + + if err = conn.WriteMessage(websocket.TextMessage, bs); err != nil { + log.Error().Msgf("register failed: %v", err) + conn.Close() + conn = nil + break + } + } + + const timeout = time.Second * 10 + + for { + conn.SetReadDeadline(time.Now().Add(timeout)) + conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(timeout)); return nil }) + + _, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || + websocket.IsCloseError(err, websocket.CloseNormalClosure) { + log.Trace().Msgf("closed from remote") + conn.Close() + conn = nil + } else if !strings.Contains(err.Error(), "i/o timeout") { + log.Error().Msgf("read message failed: %#v", err) + } + failedCnt++ + if failedCnt > 60 { + if conn != nil { + conn.Close() + conn = nil + } + failedCnt = 0 + } + break + } + + // TODO: handle the message + var msg Message + if err = json.Unmarshal(message, &msg); err != nil { + log.Error().Msgf("unmarshal received message faild: %v", err) + continue + } + switch msg.Version { + case 1: + switch msg.Type { + case 1: + log.Info().Msgf("received message: %s", message) + case 2: + case 4: + log.Info().Msgf("received no task") + case 3: + switch msg.EventName { + case "push": + input := Input{ + forgeInstance: "github.com", + } + if err := runTask(context.Background(), &input, ""); err != nil { + log.Error().Msgf("run task failed: %v", err) + } + default: + log.Warn().Msgf("unknow event %s with payload %s", msg.EventName, msg.EventPayload) + } + + default: + log.Error().Msgf("received a message with an unsupported type: %#v", msg) + } + default: + log.Error().Msgf("recevied a message with an unsupported version, consider upgrade your runner") + } + } + } + } + } +} diff --git a/cmd/root.go b/cmd/root.go index 12ddc10..fc809e6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -70,6 +70,13 @@ func Execute(ctx context.Context) { Version: version, SilenceUsage: true, } + rootCmd.AddCommand(&cobra.Command{ + Aliases: []string{"daemon"}, + Use: "daemon [event name to run]\nIf no event name passed, will default to \"on: push\"", + Short: "Run GitHub actions locally by specifying the event name (e.g. `push`) or an action name directly.", + Args: cobra.MaximumNArgs(1), + RunE: runDaemon(ctx, &input), + }) rootCmd.Flags().BoolP("run", "r", false, "run workflows") rootCmd.Flags().StringP("job", "j", "", "run job") rootCmd.PersistentFlags().StringVarP(&input.forgeInstance, "forge-instance", "", "github.com", "Forge instance to use.") @@ -96,91 +103,96 @@ func getWorkflowsPath() (string, error) { return p, nil } +func runTask(ctx context.Context, input *Input, jobID string) error { + workflowsPath, err := getWorkflowsPath() + if err != nil { + return err + } + planner, err := model.NewWorkflowPlanner(workflowsPath, false) + if err != nil { + return err + } + + var eventName string + events := planner.GetEvents() + if len(events) > 0 { + // set default event type to first event + // this way user dont have to specify the event. + log.Debug().Msgf("Using detected workflow event: %s", events[0]) + eventName = events[0] + } else { + if plan := planner.PlanEvent("push"); plan != nil { + eventName = "push" + } + } + + // build the plan for this run + var plan *model.Plan + if jobID != "" { + log.Debug().Msgf("Planning job: %s", jobID) + plan = planner.PlanJob(jobID) + } else { + log.Debug().Msgf("Planning event: %s", eventName) + plan = planner.PlanEvent(eventName) + } + + curDir, err := os.Getwd() + if err != nil { + return err + } + + // run the plan + config := &runner.Config{ + Actor: input.actor, + EventName: eventName, + EventPath: "", + DefaultBranch: "", + ForcePull: input.forcePull, + ForceRebuild: input.forceRebuild, + ReuseContainers: input.reuseContainers, + Workdir: curDir, + BindWorkdir: input.bindWorkdir, + LogOutput: true, + JSONLogger: input.jsonLogger, + // Env: envs, + // Secrets: secrets, + InsecureSecrets: input.insecureSecrets, + Platforms: input.newPlatforms(), + Privileged: input.privileged, + UsernsMode: input.usernsMode, + ContainerArchitecture: input.containerArchitecture, + ContainerDaemonSocket: input.containerDaemonSocket, + UseGitIgnore: input.useGitIgnore, + GitHubInstance: input.forgeInstance, + ContainerCapAdd: input.containerCapAdd, + ContainerCapDrop: input.containerCapDrop, + AutoRemove: input.autoRemove, + ArtifactServerPath: input.artifactServerPath, + ArtifactServerPort: input.artifactServerPort, + NoSkipCheckout: input.noSkipCheckout, + // RemoteName: input.remoteName, + } + r, err := runner.New(config) + if err != nil { + return fmt.Errorf("New config failed: %v", err) + } + + cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort) + + executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error { + cancel() + return nil + }) + return executor(ctx) +} + func runCommand(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { - workflowsPath, err := getWorkflowsPath() - if err != nil { - return err - } - planner, err := model.NewWorkflowPlanner(workflowsPath, false) + jobID, err := cmd.Flags().GetString("job") if err != nil { return err } - var eventName string - events := planner.GetEvents() - if len(events) > 0 { - // set default event type to first event - // this way user dont have to specify the event. - log.Debug().Msgf("Using detected workflow event: %s", events[0]) - eventName = events[0] - } else { - if len(args) > 0 { - eventName = args[0] - } else if plan := planner.PlanEvent("push"); plan != nil { - eventName = "push" - } - } - - // build the plan for this run - var plan *model.Plan - if jobID, err := cmd.Flags().GetString("job"); err != nil { - return err - } else if jobID != "" { - log.Debug().Msgf("Planning job: %s", jobID) - plan = planner.PlanJob(jobID) - } else { - log.Debug().Msgf("Planning event: %s", eventName) - plan = planner.PlanEvent(eventName) - } - - curDir, err := os.Getwd() - if err != nil { - return err - } - - // run the plan - config := &runner.Config{ - Actor: input.actor, - EventName: eventName, - EventPath: "", - DefaultBranch: "", - ForcePull: input.forcePull, - ForceRebuild: input.forceRebuild, - ReuseContainers: input.reuseContainers, - Workdir: curDir, - BindWorkdir: input.bindWorkdir, - LogOutput: !input.noOutput, - JSONLogger: input.jsonLogger, - // Env: envs, - // Secrets: secrets, - InsecureSecrets: input.insecureSecrets, - Platforms: input.newPlatforms(), - Privileged: input.privileged, - UsernsMode: input.usernsMode, - ContainerArchitecture: input.containerArchitecture, - ContainerDaemonSocket: input.containerDaemonSocket, - UseGitIgnore: input.useGitIgnore, - GitHubInstance: input.forgeInstance, - ContainerCapAdd: input.containerCapAdd, - ContainerCapDrop: input.containerCapDrop, - AutoRemove: input.autoRemove, - ArtifactServerPath: input.artifactServerPath, - ArtifactServerPort: input.artifactServerPort, - NoSkipCheckout: input.noSkipCheckout, - // RemoteName: input.remoteName, - } - r, err := runner.New(config) - if err != nil { - return fmt.Errorf("New config failed: %v", err) - } - - cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort) - - executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error { - cancel() - return nil - }) - return executor(ctx) + return runTask(ctx, input, jobID) } } diff --git a/go.mod b/go.mod index 8f8ae6d..61c56c3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module gitea.com/gitea/act_runner go 1.18 require ( + github.com/gorilla/websocket v1.4.2 github.com/nektos/act v0.2.26 github.com/rs/zerolog v1.26.1 github.com/spf13/cobra v1.4.0 diff --git a/go.sum b/go.sum index dab85b2..5da7b86 100644 --- a/go.sum +++ b/go.sum @@ -736,6 +736,7 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gostaticanalysis/analysisutil v0.0.0-20190318220348-4088753ea4d3/go.mod h1:eEOZF4jCKGi+aprrirO9e7WKB3beBRtWgqGunKl6pKE= github.com/gostaticanalysis/analysisutil v0.0.3/go.mod h1:eEOZF4jCKGi+aprrirO9e7WKB3beBRtWgqGunKl6pKE=