almost perfectly supports async upload
This commit is contained in:
parent
e378ac29cd
commit
bcf0c06e6f
2
go.mod
2
go.mod
@ -26,6 +26,7 @@ require (
|
|||||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||||
github.com/go-playground/validator/v10 v10.22.0 // indirect
|
github.com/go-playground/validator/v10 v10.22.0 // indirect
|
||||||
github.com/goccy/go-json v0.10.3 // indirect
|
github.com/goccy/go-json v0.10.3 // indirect
|
||||||
|
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||||
github.com/json-iterator/go v1.1.12 // indirect
|
github.com/json-iterator/go v1.1.12 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||||
github.com/leodido/go-urn v1.4.0 // indirect
|
github.com/leodido/go-urn v1.4.0 // indirect
|
||||||
@ -38,6 +39,7 @@ require (
|
|||||||
golang.org/x/arch v0.8.0 // indirect
|
golang.org/x/arch v0.8.0 // indirect
|
||||||
golang.org/x/crypto v0.25.0 // indirect
|
golang.org/x/crypto v0.25.0 // indirect
|
||||||
golang.org/x/net v0.27.0 // indirect
|
golang.org/x/net v0.27.0 // indirect
|
||||||
|
golang.org/x/sync v0.7.0 // indirect
|
||||||
golang.org/x/sys v0.22.0 // indirect
|
golang.org/x/sys v0.22.0 // indirect
|
||||||
golang.org/x/text v0.16.0 // indirect
|
golang.org/x/text v0.16.0 // indirect
|
||||||
google.golang.org/protobuf v1.34.2 // indirect
|
google.golang.org/protobuf v1.34.2 // indirect
|
||||||
|
90
main.go
90
main.go
@ -7,13 +7,14 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
"github.com/jaswdr/faker"
|
"github.com/jaswdr/faker"
|
||||||
)
|
)
|
||||||
|
|
||||||
var dbconn *pgx.Conn
|
var dbpool *pgxpool.Pool
|
||||||
var uploadFolder string
|
var uploadFolder string
|
||||||
|
|
||||||
func initUpload(c *gin.Context) {
|
func initUpload(c *gin.Context) {
|
||||||
@ -23,7 +24,7 @@ func initUpload(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, _ := dbconn.Query(context.Background(), "select filepath from videos")
|
rows, _ := dbpool.Query(context.Background(), "select filepath from videos")
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var filepath string
|
var filepath string
|
||||||
err = rows.Scan(&filepath)
|
err = rows.Scan(&filepath)
|
||||||
@ -42,6 +43,21 @@ func initUpload(c *gin.Context) {
|
|||||||
log.Panicf("initUpload: %v\n", err)
|
log.Panicf("initUpload: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sqlStmt := `
|
||||||
|
DROP TABLE IF EXISTS chunks;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS chunks
|
||||||
|
(
|
||||||
|
id integer,
|
||||||
|
fileName text,
|
||||||
|
chunk bytea
|
||||||
|
)
|
||||||
|
`
|
||||||
|
_, err = dbpool.Exec(context.Background(), sqlStmt)
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("initUpload: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
c.JSON(http.StatusOK, "Upload can proceed")
|
c.JSON(http.StatusOK, "Upload can proceed")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,31 +68,61 @@ func receiveChunk(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := os.OpenFile(fmt.Sprintf("%s/%s", uploadFolder, c.GetHeader("file-name")), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0640)
|
_, err = dbpool.Exec(context.Background(), "insert into chunks(id, fileName, chunk) values($1, $2, $3)", c.GetHeader("chunk-id"), c.GetHeader("file-name"), chunk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, "")
|
c.JSON(http.StatusInternalServerError, "")
|
||||||
log.Panicf("receiveChunk: %v\n", err)
|
log.Panicf("receiveChunk: %v\n", err)
|
||||||
}
|
}
|
||||||
if _, err := f.Write(chunk); err != nil {
|
|
||||||
c.JSON(http.StatusInternalServerError, "")
|
|
||||||
log.Panicf("receiveChunk: %v\n", err)
|
|
||||||
}
|
|
||||||
if err := f.Close(); err != nil {
|
|
||||||
c.JSON(http.StatusInternalServerError, "")
|
|
||||||
log.Panicf("receiveChunk: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.JSON(http.StatusOK, "Received chunk")
|
c.JSON(http.StatusOK, "Received chunk")
|
||||||
}
|
}
|
||||||
|
|
||||||
func finishUpload(c *gin.Context) {
|
func finishUpload(c *gin.Context) {
|
||||||
|
allChunks := map[int][]byte{}
|
||||||
|
|
||||||
fileName, err := io.ReadAll(c.Request.Body)
|
fileName, err := io.ReadAll(c.Request.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(http.StatusBadRequest, "Couldn't read html request body")
|
c.JSON(http.StatusBadRequest, "Couldn't read html request body")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = dbconn.Exec(context.Background(), "insert into videos(filepath) values($1)", fmt.Sprintf("%s/%s", uploadFolder, fileName))
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
rows, _ := dbpool.Query(context.Background(), "select id, chunk from chunks where fileName like $1", fileName)
|
||||||
|
for rows.Next() {
|
||||||
|
var id int
|
||||||
|
var chunk []byte
|
||||||
|
err := rows.Scan(&id, &chunk)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, "")
|
||||||
|
log.Panicf("finishUpload: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
allChunks[id] = chunk
|
||||||
|
}
|
||||||
|
err = rows.Err()
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, "")
|
||||||
|
log.Panicf("finishUpload: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(allChunks); i++ {
|
||||||
|
f, err := os.OpenFile(fmt.Sprintf("%s/%s", uploadFolder, fileName), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0640)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, "")
|
||||||
|
log.Panicf("finishUpload: %v\n", err)
|
||||||
|
}
|
||||||
|
if _, err := f.Write(allChunks[i]); err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, "")
|
||||||
|
log.Panicf("finishUpload: %v\n", err)
|
||||||
|
}
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, "")
|
||||||
|
log.Panicf("finishUpload: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = dbpool.Exec(context.Background(), "insert into videos(filepath) values($1)", fmt.Sprintf("%s/%s", uploadFolder, fileName))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, "")
|
c.JSON(http.StatusInternalServerError, "")
|
||||||
log.Panicf("finishUpload: %v\n", err)
|
log.Panicf("finishUpload: %v\n", err)
|
||||||
@ -88,7 +134,7 @@ func finishUpload(c *gin.Context) {
|
|||||||
func listVideos(c *gin.Context) {
|
func listVideos(c *gin.Context) {
|
||||||
allVideos := map[int]string{}
|
allVideos := map[int]string{}
|
||||||
|
|
||||||
rows, _ := dbconn.Query(context.Background(), "select * from videos")
|
rows, _ := dbpool.Query(context.Background(), "select * from videos")
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var id int
|
var id int
|
||||||
var filepath string
|
var filepath string
|
||||||
@ -109,7 +155,7 @@ func listVideos(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getVideo(c *gin.Context) {
|
func getVideo(c *gin.Context) {
|
||||||
rows, _ := dbconn.Query(context.Background(), "select filepath from videos where id = $1", c.Param("id"))
|
rows, _ := dbpool.Query(context.Background(), "select filepath from videos where id = $1", c.Param("id"))
|
||||||
rows.Next()
|
rows.Next()
|
||||||
err := rows.Err()
|
err := rows.Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -128,7 +174,7 @@ func getVideo(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func deleteVideo(c *gin.Context) {
|
func deleteVideo(c *gin.Context) {
|
||||||
rows, _ := dbconn.Query(context.Background(), "select filepath from videos where id = $1", c.Param("id"))
|
rows, _ := dbpool.Query(context.Background(), "select filepath from videos where id = $1", c.Param("id"))
|
||||||
rows.Next()
|
rows.Next()
|
||||||
err := rows.Err()
|
err := rows.Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -143,7 +189,7 @@ func deleteVideo(c *gin.Context) {
|
|||||||
log.Panicf("deleteVideo: %v\n", err)
|
log.Panicf("deleteVideo: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = dbconn.Exec(context.Background(), "delete from videos where id = $1", c.Param("id"))
|
_, err = dbpool.Exec(context.Background(), "delete from videos where id = $1", c.Param("id"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, "Id was likely invalid")
|
c.JSON(http.StatusInternalServerError, "Id was likely invalid")
|
||||||
log.Panicf("deleteVideo: %v\n", err)
|
log.Panicf("deleteVideo: %v\n", err)
|
||||||
@ -159,11 +205,11 @@ func deleteVideo(c *gin.Context) {
|
|||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var err error
|
var err error
|
||||||
dbconn, err = pgx.Connect(context.Background(), "postgresql://postgres:postgres@172.18.0.3:5432/postgres")
|
dbpool, err = pgxpool.New(context.Background(), "postgresql://postgres:postgres@172.18.0.3:5432/postgres")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
defer dbconn.Close(context.Background())
|
defer dbpool.Close()
|
||||||
|
|
||||||
sqlStmt := `
|
sqlStmt := `
|
||||||
DROP TABLE IF EXISTS videos;
|
DROP TABLE IF EXISTS videos;
|
||||||
@ -175,7 +221,7 @@ func main() {
|
|||||||
CONSTRAINT videos_pkey PRIMARY KEY (id)
|
CONSTRAINT videos_pkey PRIMARY KEY (id)
|
||||||
)
|
)
|
||||||
`
|
`
|
||||||
_, err = dbconn.Exec(context.Background(), sqlStmt)
|
_, err = dbpool.Exec(context.Background(), sqlStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("main: %v\n", err)
|
log.Panicf("main: %v\n", err)
|
||||||
}
|
}
|
||||||
@ -183,7 +229,7 @@ func main() {
|
|||||||
faker := faker.New()
|
faker := faker.New()
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
_, err = dbconn.Exec(context.Background(), "insert into videos(filepath) values($1)", faker.File().AbsoluteFilePathForUnix(2))
|
_, err = dbpool.Exec(context.Background(), "insert into videos(filepath) values($1)", faker.File().AbsoluteFilePathForUnix(2))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("main: %v\n", err)
|
log.Panicf("main: %v\n", err)
|
||||||
}
|
}
|
||||||
@ -216,7 +262,7 @@ func main() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = dbconn.Exec(context.Background(), "insert into videos(filepath) values($1)", fmt.Sprintf("%s/%s", uploadFolder, file.Name()))
|
_, err = dbpool.Exec(context.Background(), "insert into videos(filepath) values($1)", fmt.Sprintf("%s/%s", uploadFolder, file.Name()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("main: %v\n", err)
|
log.Panicf("main: %v\n", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user