diff --git a/go.mod b/go.mod index 340a5db..233cd66 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.22.0 // indirect github.com/goccy/go-json v0.10.3 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/leodido/go-urn v1.4.0 // indirect @@ -38,6 +39,7 @@ require ( golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.25.0 // indirect golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/main.go b/main.go index b1fb85b..b417fc2 100644 --- a/main.go +++ b/main.go @@ -7,13 +7,14 @@ import ( "log" "net/http" "os" + "time" "github.com/gin-gonic/gin" - "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "github.com/jaswdr/faker" ) -var dbconn *pgx.Conn +var dbpool *pgxpool.Pool var uploadFolder string func initUpload(c *gin.Context) { @@ -23,7 +24,7 @@ func initUpload(c *gin.Context) { return } - rows, _ := dbconn.Query(context.Background(), "select filepath from videos") + rows, _ := dbpool.Query(context.Background(), "select filepath from videos") for rows.Next() { var filepath string err = rows.Scan(&filepath) @@ -42,6 +43,21 @@ func initUpload(c *gin.Context) { log.Panicf("initUpload: %v\n", err) } + sqlStmt := ` + DROP TABLE IF EXISTS chunks; + + CREATE TABLE IF NOT EXISTS chunks + ( + id integer, + fileName text, + chunk bytea + ) + ` + _, err = dbpool.Exec(context.Background(), sqlStmt) + if err != nil { + log.Panicf("initUpload: %v\n", err) + } + c.JSON(http.StatusOK, "Upload can proceed") } @@ -52,31 +68,61 @@ func receiveChunk(c *gin.Context) { return } - f, err := os.OpenFile(fmt.Sprintf("%s/%s", uploadFolder, c.GetHeader("file-name")), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0640) + _, err = dbpool.Exec(context.Background(), "insert into chunks(id, fileName, chunk) values($1, $2, $3)", c.GetHeader("chunk-id"), c.GetHeader("file-name"), chunk) if err != nil { c.JSON(http.StatusInternalServerError, "") log.Panicf("receiveChunk: %v\n", err) } - if _, err := f.Write(chunk); err != nil { - c.JSON(http.StatusInternalServerError, "") - log.Panicf("receiveChunk: %v\n", err) - } - if err := f.Close(); err != nil { - c.JSON(http.StatusInternalServerError, "") - log.Panicf("receiveChunk: %v\n", err) - } c.JSON(http.StatusOK, "Received chunk") } func finishUpload(c *gin.Context) { + allChunks := map[int][]byte{} + fileName, err := io.ReadAll(c.Request.Body) if err != nil { c.JSON(http.StatusBadRequest, "Couldn't read html request body") return } - _, err = dbconn.Exec(context.Background(), "insert into videos(filepath) values($1)", fmt.Sprintf("%s/%s", uploadFolder, fileName)) + time.Sleep(time.Second) + + rows, _ := dbpool.Query(context.Background(), "select id, chunk from chunks where fileName like $1", fileName) + for rows.Next() { + var id int + var chunk []byte + err := rows.Scan(&id, &chunk) + if err != nil { + c.JSON(http.StatusInternalServerError, "") + log.Panicf("finishUpload: %v\n", err) + } + + allChunks[id] = chunk + } + err = rows.Err() + if err != nil { + c.JSON(http.StatusInternalServerError, "") + log.Panicf("finishUpload: %v\n", err) + } + + for i := 0; i < len(allChunks); i++ { + f, err := os.OpenFile(fmt.Sprintf("%s/%s", uploadFolder, fileName), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0640) + if err != nil { + c.JSON(http.StatusInternalServerError, "") + log.Panicf("finishUpload: %v\n", err) + } + if _, err := f.Write(allChunks[i]); err != nil { + c.JSON(http.StatusInternalServerError, "") + log.Panicf("finishUpload: %v\n", err) + } + if err := f.Close(); err != nil { + c.JSON(http.StatusInternalServerError, "") + log.Panicf("finishUpload: %v\n", err) + } + } + + _, err = dbpool.Exec(context.Background(), "insert into videos(filepath) values($1)", fmt.Sprintf("%s/%s", uploadFolder, fileName)) if err != nil { c.JSON(http.StatusInternalServerError, "") log.Panicf("finishUpload: %v\n", err) @@ -88,7 +134,7 @@ func finishUpload(c *gin.Context) { func listVideos(c *gin.Context) { allVideos := map[int]string{} - rows, _ := dbconn.Query(context.Background(), "select * from videos") + rows, _ := dbpool.Query(context.Background(), "select * from videos") for rows.Next() { var id int var filepath string @@ -109,7 +155,7 @@ func listVideos(c *gin.Context) { } func getVideo(c *gin.Context) { - rows, _ := dbconn.Query(context.Background(), "select filepath from videos where id = $1", c.Param("id")) + rows, _ := dbpool.Query(context.Background(), "select filepath from videos where id = $1", c.Param("id")) rows.Next() err := rows.Err() if err != nil { @@ -128,7 +174,7 @@ func getVideo(c *gin.Context) { } func deleteVideo(c *gin.Context) { - rows, _ := dbconn.Query(context.Background(), "select filepath from videos where id = $1", c.Param("id")) + rows, _ := dbpool.Query(context.Background(), "select filepath from videos where id = $1", c.Param("id")) rows.Next() err := rows.Err() if err != nil { @@ -143,7 +189,7 @@ func deleteVideo(c *gin.Context) { log.Panicf("deleteVideo: %v\n", err) } - _, err = dbconn.Exec(context.Background(), "delete from videos where id = $1", c.Param("id")) + _, err = dbpool.Exec(context.Background(), "delete from videos where id = $1", c.Param("id")) if err != nil { c.JSON(http.StatusInternalServerError, "Id was likely invalid") log.Panicf("deleteVideo: %v\n", err) @@ -159,11 +205,11 @@ func deleteVideo(c *gin.Context) { func main() { var err error - dbconn, err = pgx.Connect(context.Background(), "postgresql://postgres:postgres@172.18.0.3:5432/postgres") + dbpool, err = pgxpool.New(context.Background(), "postgresql://postgres:postgres@172.18.0.3:5432/postgres") if err != nil { log.Fatal(err) } - defer dbconn.Close(context.Background()) + defer dbpool.Close() sqlStmt := ` DROP TABLE IF EXISTS videos; @@ -175,7 +221,7 @@ func main() { CONSTRAINT videos_pkey PRIMARY KEY (id) ) ` - _, err = dbconn.Exec(context.Background(), sqlStmt) + _, err = dbpool.Exec(context.Background(), sqlStmt) if err != nil { log.Panicf("main: %v\n", err) } @@ -183,7 +229,7 @@ func main() { faker := faker.New() for i := 0; i < 10; i++ { - _, err = dbconn.Exec(context.Background(), "insert into videos(filepath) values($1)", faker.File().AbsoluteFilePathForUnix(2)) + _, err = dbpool.Exec(context.Background(), "insert into videos(filepath) values($1)", faker.File().AbsoluteFilePathForUnix(2)) if err != nil { log.Panicf("main: %v\n", err) } @@ -216,7 +262,7 @@ func main() { continue } - _, err = dbconn.Exec(context.Background(), "insert into videos(filepath) values($1)", fmt.Sprintf("%s/%s", uploadFolder, file.Name())) + _, err = dbpool.Exec(context.Background(), "insert into videos(filepath) values($1)", fmt.Sprintf("%s/%s", uploadFolder, file.Name())) if err != nil { log.Panicf("main: %v\n", err) }