act_runner/cmd/damon.go

225 lines
5.6 KiB
Go
Raw Normal View History

2022-05-02 11:02:51 +02:00
package cmd
import (
"context"
"encoding/json"
"errors"
2022-06-20 11:23:34 +02:00
"fmt"
2022-05-02 11:02:51 +02:00
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)
type Message struct {
Version int //
Type int // message type, 1 register 2 error
RunnerUUID string // runner uuid
2022-06-20 10:37:28 +02:00
BuildUUID string // build uuid
2022-05-02 11:02:51 +02:00
ErrCode int // error code
ErrContent string // errors message
EventName string
EventPayload string
2022-06-20 10:37:28 +02:00
JobID string // only run the special job, empty means run all the jobs
2022-05-02 11:02:51 +02:00
}
2022-06-20 10:37:28 +02:00
const (
MsgTypeRegister = iota + 1 // register
MsgTypeError // error
MsgTypeRequestBuild // request build task
MsgTypeIdle // no task
MsgTypeBuildResult // build result
)
func handleVersion1(ctx context.Context, conn *websocket.Conn, message []byte, msg *Message) error {
2022-06-20 11:23:34 +02:00
switch msg.Type {
case MsgTypeRegister:
log.Info().Msgf("received registered success: %s", message)
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeError:
log.Info().Msgf("received error msessage: %s", message)
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeIdle:
log.Info().Msgf("received no task")
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeRequestBuild:
switch msg.EventName {
case "push":
input := Input{
forgeInstance: "github.com",
reuseContainers: true,
}
ctx, cancel := context.WithTimeout(ctx, time.Hour)
2022-06-20 11:23:34 +02:00
defer cancel()
done := make(chan error)
go func(chan error) {
done <- runTask(ctx, &input, "")
}(done)
c := time.NewTicker(time.Second)
defer c.Stop()
for {
select {
case <-ctx.Done():
2022-06-20 11:23:34 +02:00
cancel()
log.Info().Msgf("cancel task")
return nil
case err := <-done:
if err != nil {
log.Error().Msgf("runTask failed: %v", err)
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeBuildResult,
RunnerUUID: msg.RunnerUUID,
BuildUUID: msg.BuildUUID,
ErrCode: 1,
ErrContent: err.Error(),
})
}
log.Error().Msgf("runTask success")
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeBuildResult,
RunnerUUID: msg.RunnerUUID,
BuildUUID: msg.BuildUUID,
})
case <-c.C:
}
}
default:
return fmt.Errorf("unknow event %s with payload %s", msg.EventName, msg.EventPayload)
}
default:
return fmt.Errorf("received a message with an unsupported type: %#v", msg)
}
}
// TODO: handle the message
func handleMessage(ctx context.Context, conn *websocket.Conn, message []byte) error {
2022-06-20 11:23:34 +02:00
var msg Message
if err := json.Unmarshal(message, &msg); err != nil {
return fmt.Errorf("unmarshal received message faild: %v", err)
}
switch msg.Version {
case 1:
return handleVersion1(ctx, conn, message, &msg)
2022-06-20 11:23:34 +02:00
default:
return fmt.Errorf("recevied a message with an unsupported version, consider upgrade your runner")
}
}
2022-05-02 11:02:51 +02:00
func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error {
log.Info().Msgf("Starting runner daemon")
return func(cmd *cobra.Command, args []string) error {
var conn *websocket.Conn
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var failedCnt int
for {
select {
case <-ctx.Done():
log.Info().Msgf("cancel task")
2022-05-02 11:02:51 +02:00
if conn != nil {
err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Error().Msgf("write close: %v", err)
}
}
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
return ctx.Err()
case <-ticker.C:
if conn == nil {
log.Trace().Msgf("trying connect %v", "ws://localhost:3000/api/actions")
conn, _, err = websocket.DefaultDialer.DialContext(ctx, "ws://localhost:3000/api/actions", nil)
if err != nil {
log.Error().Msgf("dial: %v", err)
break
}
// register the client
msg := Message{
Version: 1,
2022-06-20 10:37:28 +02:00
Type: MsgTypeRegister,
2022-05-02 11:02:51 +02:00
RunnerUUID: "111111",
}
bs, err := json.Marshal(&msg)
if err != nil {
log.Error().Msgf("Marshal: %v", err)
break
}
if err = conn.WriteMessage(websocket.TextMessage, bs); err != nil {
log.Error().Msgf("register failed: %v", err)
conn.Close()
conn = nil
break
}
}
const timeout = time.Second * 10
for {
2022-06-20 11:23:34 +02:00
select {
case <-ctx.Done():
log.Info().Msg("cancel task")
2022-06-20 11:23:34 +02:00
return nil
default:
}
2022-07-20 08:42:22 +02:00
_ = conn.SetReadDeadline(time.Now().Add(timeout))
conn.SetPongHandler(func(string) error {
return conn.SetReadDeadline(time.Now().Add(timeout))
})
2022-05-02 11:02:51 +02:00
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) ||
websocket.IsCloseError(err, websocket.CloseNormalClosure) {
log.Trace().Msgf("closed from remote")
conn.Close()
conn = nil
} else if !strings.Contains(err.Error(), "i/o timeout") {
log.Error().Msgf("read message failed: %#v", err)
}
failedCnt++
if failedCnt > 60 {
if conn != nil {
conn.Close()
conn = nil
}
failedCnt = 0
}
2022-06-20 11:30:17 +02:00
break
2022-05-02 11:02:51 +02:00
}
if err := handleMessage(ctx, conn, message); err != nil {
2022-06-20 11:30:17 +02:00
log.Error().Msgf(err.Error())
2022-05-02 11:02:51 +02:00
}
}
}
}
}
}