Fix release assets upload retry logic
Enables up to 3 retries of uploading a single asset when encountering a network error or a HTTP 5xx error. Bonus: - simplifies ConcurrentUpload implementation - support Go context cancellation
This commit is contained in:
parent
cbeed671dd
commit
8cb312a620
5 changed files with 103 additions and 50 deletions
2
go.mod
2
go.mod
|
|
@ -36,7 +36,7 @@ require (
|
|||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.7.5
|
||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
golang.org/x/sync v0.1.0
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
|
||||
golang.org/x/text v0.3.8
|
||||
google.golang.org/grpc v1.49.0
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -342,8 +342,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
|
|
|||
|
|
@ -755,12 +755,12 @@ func Test_createRun(t *testing.T) {
|
|||
"upload_url": "https://api.github.com/assets/upload",
|
||||
"html_url": "https://github.com/OWNER/REPO/releases/tag/v1.2.3"
|
||||
}`))
|
||||
reg.Register(httpmock.REST("POST", "assets/upload"), httpmock.StatusStringResponse(500, `{}`))
|
||||
reg.Register(httpmock.REST("POST", "assets/upload"), httpmock.StatusStringResponse(422, `{}`))
|
||||
reg.Register(httpmock.REST("DELETE", "releases/123"), httpmock.StatusStringResponse(204, ``))
|
||||
},
|
||||
wantStdout: ``,
|
||||
wantStderr: ``,
|
||||
wantErr: `HTTP 500 (https://api.github.com/assets/upload?label=&name=ball.tgz)`,
|
||||
wantErr: `HTTP 422 (https://api.github.com/assets/upload?label=&name=ball.tgz)`,
|
||||
},
|
||||
{
|
||||
name: "clean up draft after publishing fails",
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
|
|
@ -13,8 +14,15 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/cli/cli/v2/api"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type httpDoer interface {
|
||||
Do(*http.Request) (*http.Response, error)
|
||||
}
|
||||
|
||||
type errNetwork struct{ error }
|
||||
|
||||
type AssetForUpload struct {
|
||||
Name string
|
||||
Label string
|
||||
|
|
@ -90,64 +98,61 @@ func fileExt(fn string) string {
|
|||
return path.Ext(fn)
|
||||
}
|
||||
|
||||
func ConcurrentUpload(httpClient *http.Client, uploadURL string, numWorkers int, assets []*AssetForUpload) error {
|
||||
func ConcurrentUpload(httpClient httpDoer, uploadURL string, numWorkers int, assets []*AssetForUpload) error {
|
||||
if numWorkers == 0 {
|
||||
return errors.New("the number of concurrent workers needs to be greater than 0")
|
||||
}
|
||||
|
||||
jobs := make(chan AssetForUpload, len(assets))
|
||||
results := make(chan error, len(assets))
|
||||
|
||||
if len(assets) < numWorkers {
|
||||
numWorkers = len(assets)
|
||||
}
|
||||
|
||||
for w := 1; w <= numWorkers; w++ {
|
||||
go func() {
|
||||
for a := range jobs {
|
||||
results <- uploadWithDelete(httpClient, uploadURL, a)
|
||||
}
|
||||
}()
|
||||
}
|
||||
ctx := context.Background()
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
g.SetLimit(numWorkers)
|
||||
|
||||
for _, a := range assets {
|
||||
jobs <- *a
|
||||
asset := *a
|
||||
g.Go(func() error {
|
||||
return uploadWithDelete(gctx, httpClient, uploadURL, asset)
|
||||
})
|
||||
}
|
||||
close(jobs)
|
||||
|
||||
var uploadError error
|
||||
for i := 0; i < len(assets); i++ {
|
||||
if err := <-results; err != nil {
|
||||
uploadError = err
|
||||
}
|
||||
}
|
||||
return uploadError
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
var retryInterval = time.Millisecond * 200
|
||||
|
||||
const maxRetries = 3
|
||||
|
||||
func uploadWithDelete(httpClient *http.Client, uploadURL string, a AssetForUpload) error {
|
||||
func shouldRetry(err error) bool {
|
||||
var networkError errNetwork
|
||||
if errors.As(err, &networkError) {
|
||||
return true
|
||||
}
|
||||
var httpError api.HTTPError
|
||||
return errors.As(err, &httpError) && httpError.StatusCode >= 500
|
||||
}
|
||||
|
||||
func uploadWithDelete(ctx context.Context, httpClient httpDoer, uploadURL string, a AssetForUpload) error {
|
||||
if a.ExistingURL != "" {
|
||||
err := deleteAsset(httpClient, a.ExistingURL)
|
||||
if err != nil {
|
||||
if err := deleteAsset(ctx, httpClient, a.ExistingURL); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
retries := 0
|
||||
for {
|
||||
var httpError api.HTTPError
|
||||
_, err := uploadAsset(httpClient, uploadURL, a)
|
||||
// retry upload several times upon receiving HTTP 5xx
|
||||
if err == nil || !errors.As(err, &httpError) || httpError.StatusCode < 500 || retries < maxRetries {
|
||||
_, err := uploadAsset(ctx, httpClient, uploadURL, a)
|
||||
if err == nil || retries == maxRetries || !shouldRetry(err) {
|
||||
return err
|
||||
}
|
||||
retries++
|
||||
time.Sleep(time.Duration(retries) * time.Second)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(time.Duration(retries) * retryInterval):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func uploadAsset(httpClient *http.Client, uploadURL string, asset AssetForUpload) (*ReleaseAsset, error) {
|
||||
func uploadAsset(ctx context.Context, httpClient httpDoer, uploadURL string, asset AssetForUpload) (*ReleaseAsset, error) {
|
||||
u, err := url.Parse(uploadURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -163,7 +168,7 @@ func uploadAsset(httpClient *http.Client, uploadURL string, asset AssetForUpload
|
|||
}
|
||||
defer f.Close()
|
||||
|
||||
req, err := http.NewRequest("POST", u.String(), f)
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -173,7 +178,7 @@ func uploadAsset(httpClient *http.Client, uploadURL string, asset AssetForUpload
|
|||
|
||||
resp, err := httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errNetwork{err}
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
|
|
@ -182,22 +187,17 @@ func uploadAsset(httpClient *http.Client, uploadURL string, asset AssetForUpload
|
|||
return nil, api.HandleHTTPError(resp)
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var newAsset ReleaseAsset
|
||||
err = json.Unmarshal(b, &newAsset)
|
||||
if err != nil {
|
||||
dec := json.NewDecoder(resp.Body)
|
||||
if err := dec.Decode(&newAsset); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &newAsset, nil
|
||||
}
|
||||
|
||||
func deleteAsset(httpClient *http.Client, assetURL string) error {
|
||||
req, err := http.NewRequest("DELETE", assetURL, nil)
|
||||
func deleteAsset(ctx context.Context, httpClient httpDoer, assetURL string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, "DELETE", assetURL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,14 @@
|
|||
package shared
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Test_typeForFilename(t *testing.T) {
|
||||
tests := []struct {
|
||||
|
|
@ -67,3 +75,48 @@ func Test_typeForFilename(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_uploadWithDelete_retry(t *testing.T) {
|
||||
retryInterval = time.Millisecond
|
||||
ctx := context.Background()
|
||||
|
||||
tries := 0
|
||||
client := funcClient(func(req *http.Request) (*http.Response, error) {
|
||||
tries++
|
||||
if tries == 1 {
|
||||
return nil, errors.New("made up exception")
|
||||
} else if tries == 2 {
|
||||
return &http.Response{
|
||||
Request: req,
|
||||
StatusCode: 500,
|
||||
Body: io.NopCloser(bytes.NewBufferString(`{}`)),
|
||||
}, nil
|
||||
}
|
||||
return &http.Response{
|
||||
Request: req,
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewBufferString(`{}`)),
|
||||
}, nil
|
||||
})
|
||||
err := uploadWithDelete(ctx, client, "http://example.com/upload", AssetForUpload{
|
||||
Name: "asset",
|
||||
Label: "",
|
||||
Size: 8,
|
||||
Open: func() (io.ReadCloser, error) {
|
||||
return io.NopCloser(bytes.NewBufferString(`somebody`)), nil
|
||||
},
|
||||
MIMEType: "application/octet-stream",
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("uploadWithDelete() error: %v", err)
|
||||
}
|
||||
if tries != 3 {
|
||||
t.Errorf("tries = %d, expected %d", tries, 3)
|
||||
}
|
||||
}
|
||||
|
||||
type funcClient func(*http.Request) (*http.Response, error)
|
||||
|
||||
func (f funcClient) Do(req *http.Request) (*http.Response, error) {
|
||||
return f(req)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue