//go:build go1.8
// +build go1.8
package s3manager_test
import (
"bytes"
"fmt"
"io"
"io/ioutil"
random "math/rand"
"net/http"
"net/http/httptest"
"os"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/awsutil"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/awstesting"
"github.com/aws/aws-sdk-go/awstesting/unit"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/internal/s3testing"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
var emptyList = []string{}
const respMsg = `
mockValue
mockValue
mockValue
mockValue
`
func val(i interface{}, s string) interface{} {
v, err := awsutil.ValuesAtPath(i, s)
if err != nil || len(v) == 0 {
return nil
}
if _, ok := v[0].(io.Reader); ok {
return v[0]
}
if rv := reflect.ValueOf(v[0]); rv.Kind() == reflect.Ptr {
return rv.Elem().Interface()
}
return v[0]
}
func contains(src []string, s string) bool {
for _, v := range src {
if s == v {
return true
}
}
return false
}
func loggingSvc(ignoreOps []string) (*s3.S3, *[]string, *[]interface{}) {
var m sync.Mutex
partNum := 0
names := []string{}
params := []interface{}{}
svc := s3.New(unit.Session)
svc.Handlers.Unmarshal.Clear()
svc.Handlers.UnmarshalMeta.Clear()
svc.Handlers.UnmarshalError.Clear()
svc.Handlers.Send.Clear()
svc.Handlers.Send.PushBack(func(r *request.Request) {
m.Lock()
defer m.Unlock()
if !contains(ignoreOps, r.Operation.Name) {
names = append(names, r.Operation.Name)
params = append(params, r.Params)
}
r.HTTPResponse = &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader([]byte(respMsg))),
}
switch data := r.Data.(type) {
case *s3.CreateMultipartUploadOutput:
data.UploadId = aws.String("UPLOAD-ID")
case *s3.UploadPartOutput:
partNum++
data.ETag = aws.String(fmt.Sprintf("ETAG%d", partNum))
case *s3.CompleteMultipartUploadOutput:
data.Location = aws.String("https://location")
data.VersionId = aws.String("VERSION-ID")
data.ETag = aws.String("ETAG")
case *s3.PutObjectOutput:
data.VersionId = aws.String("VERSION-ID")
data.ETag = aws.String("ETAG")
}
})
return svc, &names, ¶ms
}
func buflen(i interface{}) int {
r := i.(io.Reader)
b, _ := ioutil.ReadAll(r)
return len(b)
}
func TestUploadOrderMulti(t *testing.T) {
s, ops, args := loggingSvc(emptyList)
u := s3manager.NewUploaderWithClient(s)
resp, err := u.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key - value"),
Body: bytes.NewReader(buf12MB),
ServerSideEncryption: aws.String("aws:kms"),
SSEKMSKeyId: aws.String("KmsId"),
ContentType: aws.String("content/type"),
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
expected := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}
if !reflect.DeepEqual(expected, *ops) {
t.Errorf("Expected %v, but received %v", expected, *ops)
}
if e, a := `https://s3.mock-region.amazonaws.com/Bucket/Key%20-%20value`, resp.Location; e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if "UPLOAD-ID" != resp.UploadID {
t.Errorf("Expected %q, but received %q", "UPLOAD-ID", resp.UploadID)
}
if "VERSION-ID" != *resp.VersionID {
t.Errorf("Expected %q, but received %q", "VERSION-ID", *resp.VersionID)
}
if "ETAG" != *resp.ETag {
t.Errorf("Expected %q, but received %q", "ETAG", *resp.ETag)
}
// Validate input values
// UploadPart
for i := 1; i < 5; i++ {
v := val((*args)[i], "UploadId")
if "UPLOAD-ID" != v {
t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v)
}
}
// CompleteMultipartUpload
v := val((*args)[4], "UploadId")
if "UPLOAD-ID" != v {
t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v)
}
for i := 0; i < 3; i++ {
e := val((*args)[4], fmt.Sprintf("MultipartUpload.Parts[%d].PartNumber", i))
if int64(i+1) != e.(int64) {
t.Errorf("Expected %d, but received %d", i+1, e)
}
}
vals := []string{
val((*args)[4], "MultipartUpload.Parts[0].ETag").(string),
val((*args)[4], "MultipartUpload.Parts[1].ETag").(string),
val((*args)[4], "MultipartUpload.Parts[2].ETag").(string),
}
for _, a := range vals {
if matched, err := regexp.MatchString(`^ETAG\d+$`, a); !matched || err != nil {
t.Errorf("Failed regexp expression `^ETAG\\d+$`")
}
}
// Custom headers
e := val((*args)[0], "ServerSideEncryption")
if e != "aws:kms" {
t.Errorf("Expected %q, but received %q", "aws:kms", e)
}
e = val((*args)[0], "SSEKMSKeyId")
if e != "KmsId" {
t.Errorf("Expected %q, but received %q", "KmsId", e)
}
e = val((*args)[0], "ContentType")
if e != "content/type" {
t.Errorf("Expected %q, but received %q", "content/type", e)
}
}
func TestUploadOrderMultiDifferentPartSize(t *testing.T) {
s, ops, args := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {
u.PartSize = 1024 * 1024 * 7
u.Concurrency = 1
})
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(buf12MB),
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
vals := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}
if !reflect.DeepEqual(vals, *ops) {
t.Errorf("Expected %v, but received %v", vals, *ops)
}
// Part lengths
if len := buflen(val((*args)[1], "Body")); 1024*1024*7 != len {
t.Errorf("Expected %d, but received %d", 1024*1024*7, len)
}
if len := buflen(val((*args)[2], "Body")); 1024*1024*5 != len {
t.Errorf("Expected %d, but received %d", 1024*1024*5, len)
}
}
func TestUploadIncreasePartSize(t *testing.T) {
s, ops, args := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {
u.Concurrency = 1
u.MaxUploadParts = 2
})
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(buf12MB),
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
if int64(s3manager.DefaultDownloadPartSize) != mgr.PartSize {
t.Errorf("Expected %d, but received %d", s3manager.DefaultDownloadPartSize, mgr.PartSize)
}
vals := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}
if !reflect.DeepEqual(vals, *ops) {
t.Errorf("Expected %v, but received %v", vals, *ops)
}
// Part lengths
if len := buflen(val((*args)[1], "Body")); (1024*1024*6)+1 != len {
t.Errorf("Expected %d, but received %d", (1024*1024*6)+1, len)
}
if len := buflen(val((*args)[2], "Body")); (1024*1024*6)-1 != len {
t.Errorf("Expected %d, but received %d", (1024*1024*6)-1, len)
}
}
func TestUploadFailIfPartSizeTooSmall(t *testing.T) {
mgr := s3manager.NewUploader(unit.Session, func(u *s3manager.Uploader) {
u.PartSize = 5
})
resp, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(buf12MB),
})
if resp != nil {
t.Errorf("Expected response to be nil, but received %v", resp)
}
if err == nil {
t.Errorf("Expected error, but received nil")
}
aerr := err.(awserr.Error)
if e, a := "ConfigError", aerr.Code(); e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if e, a := "part size must be at least", aerr.Message(); !strings.Contains(a, e) {
t.Errorf("expect %v to be in %v", e, a)
}
}
func TestUploadOrderSingle(t *testing.T) {
s, ops, args := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s)
resp, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key - value"),
Body: bytes.NewReader(buf2MB),
ServerSideEncryption: aws.String("aws:kms"),
SSEKMSKeyId: aws.String("KmsId"),
ContentType: aws.String("content/type"),
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
if vals := []string{"PutObject"}; !reflect.DeepEqual(vals, *ops) {
t.Errorf("Expected %v, but received %v", vals, *ops)
}
if e, a := `https://s3.mock-region.amazonaws.com/Bucket/Key%20-%20value`, resp.Location; e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if e := "VERSION-ID"; e != *resp.VersionID {
t.Errorf("Expected %q, but received %q", e, *resp.VersionID)
}
if "ETAG" != *resp.ETag {
t.Errorf("Expected %q, but received %q", "ETAG", *resp.ETag)
}
if len(resp.UploadID) > 0 {
t.Errorf("Expected empty string, but received %q", resp.UploadID)
}
if e, a := "aws:kms", val((*args)[0], "ServerSideEncryption").(string); e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if e, a := "KmsId", val((*args)[0], "SSEKMSKeyId").(string); e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if e, a := "content/type", val((*args)[0], "ContentType").(string); e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
}
func TestUploadOrderSingleFailure(t *testing.T) {
s, ops, _ := loggingSvc(emptyList)
s.Handlers.Send.PushBack(func(r *request.Request) {
r.HTTPResponse.StatusCode = 400
})
mgr := s3manager.NewUploaderWithClient(s)
resp, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(buf2MB),
})
if err == nil {
t.Error("Expected error, but receievd nil")
}
if vals := []string{"PutObject"}; !reflect.DeepEqual(vals, *ops) {
t.Errorf("Expected %v, but received %v", vals, *ops)
}
if resp != nil {
t.Errorf("Expected response to be nil, but received %v", resp)
}
}
func TestUploadOrderZero(t *testing.T) {
s, ops, args := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s)
resp, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(make([]byte, 0)),
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
if vals := []string{"PutObject"}; !reflect.DeepEqual(vals, *ops) {
t.Errorf("Expected %v, but received %v", vals, *ops)
}
if len(resp.Location) == 0 {
t.Error("Expected Location to not be empty")
}
if len(resp.UploadID) > 0 {
t.Errorf("Expected empty string, but received %q", resp.UploadID)
}
if e, a := 0, buflen(val((*args)[0], "Body")); e != a {
t.Errorf("Expected %d, but received %d", e, a)
}
}
func TestUploadOrderMultiFailure(t *testing.T) {
s, ops, _ := loggingSvc(emptyList)
s.Handlers.Send.PushBack(func(r *request.Request) {
switch t := r.Data.(type) {
case *s3.UploadPartOutput:
if *t.ETag == "ETAG2" {
r.HTTPResponse.StatusCode = 400
}
}
})
mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {
u.Concurrency = 1
})
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(buf12MB),
})
if err == nil {
t.Error("Expected error, but receievd nil")
}
if e, a := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "AbortMultipartUpload"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but received %v", e, a)
}
}
func TestUploadOrderMultiFailureOnComplete(t *testing.T) {
s, ops, _ := loggingSvc(emptyList)
s.Handlers.Send.PushBack(func(r *request.Request) {
switch r.Data.(type) {
case *s3.CompleteMultipartUploadOutput:
r.HTTPResponse.StatusCode = 400
}
})
mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {
u.Concurrency = 1
})
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(buf12MB),
})
if err == nil {
t.Error("Expected error, but receievd nil")
}
if e, a := []string{"CreateMultipartUpload", "UploadPart", "UploadPart",
"UploadPart", "CompleteMultipartUpload", "AbortMultipartUpload"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but received %v", e, a)
}
}
func TestUploadOrderMultiFailureOnCreate(t *testing.T) {
s, ops, _ := loggingSvc(emptyList)
s.Handlers.Send.PushBack(func(r *request.Request) {
switch r.Data.(type) {
case *s3.CreateMultipartUploadOutput:
r.HTTPResponse.StatusCode = 400
}
})
mgr := s3manager.NewUploaderWithClient(s)
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(make([]byte, 1024*1024*12)),
})
if err == nil {
t.Error("Expected error, but receievd nil")
}
if e, a := []string{"CreateMultipartUpload"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but received %v", e, a)
}
}
func TestUploadOrderMultiFailureLeaveParts(t *testing.T) {
s, ops, _ := loggingSvc(emptyList)
s.Handlers.Send.PushBack(func(r *request.Request) {
switch data := r.Data.(type) {
case *s3.UploadPartOutput:
if *data.ETag == "ETAG2" {
r.HTTPResponse.StatusCode = 400
}
}
})
mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {
u.Concurrency = 1
u.LeavePartsOnError = true
})
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(make([]byte, 1024*1024*12)),
})
if err == nil {
t.Error("Expected error, but receievd nil")
}
if e, a := []string{"CreateMultipartUpload", "UploadPart", "UploadPart"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but received %v", e, a)
}
}
type failreader struct {
times int
failCount int
}
func (f *failreader) Read(b []byte) (int, error) {
f.failCount++
if f.failCount >= f.times {
return 0, fmt.Errorf("random failure")
}
return len(b), nil
}
func TestUploadOrderReadFail1(t *testing.T) {
s, ops, _ := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s)
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: &failreader{times: 1},
})
if e, a := "ReadRequestBody", err.(awserr.Error).Code(); e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if e, a := err.(awserr.Error).OrigErr().Error(), "random failure"; e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if e, a := []string{}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but received %v", e, a)
}
}
func TestUploadOrderReadFail2(t *testing.T) {
s, ops, _ := loggingSvc([]string{"UploadPart"})
mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {
u.Concurrency = 1
})
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: &failreader{times: 2},
})
if e, a := "MultipartUpload", err.(awserr.Error).Code(); e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if e, a := "ReadRequestBody", err.(awserr.Error).OrigErr().(awserr.Error).Code(); e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if errStr := err.(awserr.Error).OrigErr().Error(); !strings.Contains(errStr, "random failure") {
t.Errorf("Expected error to contains 'random failure', but was %q", errStr)
}
if e, a := []string{"CreateMultipartUpload", "AbortMultipartUpload"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but receievd %v", e, a)
}
}
type sizedReader struct {
size int
cur int
err error
}
func (s *sizedReader) Read(p []byte) (n int, err error) {
if s.cur >= s.size {
if s.err == nil {
s.err = io.EOF
}
return 0, s.err
}
n = len(p)
s.cur += len(p)
if s.cur > s.size {
n -= s.cur - s.size
}
return n, err
}
func TestUploadOrderMultiBufferedReader(t *testing.T) {
s, ops, args := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s)
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: &sizedReader{size: 1024 * 1024 * 12},
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
if e, a := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but receievd %v", e, a)
}
// Part lengths
parts := []int{
buflen(val((*args)[1], "Body")),
buflen(val((*args)[2], "Body")),
buflen(val((*args)[3], "Body")),
}
sort.Ints(parts)
if e, a := []int{1024 * 1024 * 2, 1024 * 1024 * 5, 1024 * 1024 * 5}, parts; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but receievd %v", e, a)
}
}
func TestUploadOrderMultiBufferedReaderPartial(t *testing.T) {
s, ops, args := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s)
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: &sizedReader{size: 1024 * 1024 * 12, err: io.EOF},
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
if e, a := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but receievd %v", e, a)
}
// Part lengths
parts := []int{
buflen(val((*args)[1], "Body")),
buflen(val((*args)[2], "Body")),
buflen(val((*args)[3], "Body")),
}
sort.Ints(parts)
if e, a := []int{1024 * 1024 * 2, 1024 * 1024 * 5, 1024 * 1024 * 5}, parts; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but receievd %v", e, a)
}
}
// TestUploadOrderMultiBufferedReaderEOF tests the edge case where the
// file size is the same as part size.
func TestUploadOrderMultiBufferedReaderEOF(t *testing.T) {
s, ops, args := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s)
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: &sizedReader{size: 1024 * 1024 * 10, err: io.EOF},
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
if e, a := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but receievd %v", e, a)
}
// Part lengths
parts := []int{
buflen(val((*args)[1], "Body")),
buflen(val((*args)[2], "Body")),
}
sort.Ints(parts)
if e, a := []int{1024 * 1024 * 5, 1024 * 1024 * 5}, parts; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but receievd %v", e, a)
}
}
func TestUploadOrderMultiBufferedReaderExceedTotalParts(t *testing.T) {
s, ops, _ := loggingSvc([]string{"UploadPart"})
mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {
u.Concurrency = 1
u.MaxUploadParts = 2
})
resp, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: &sizedReader{size: 1024 * 1024 * 12},
})
if err == nil {
t.Error("Expected an error, but received nil")
}
if resp != nil {
t.Errorf("Expected nil, but receievd %v", resp)
}
if e, a := []string{"CreateMultipartUpload", "AbortMultipartUpload"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but receievd %v", e, a)
}
aerr := err.(awserr.Error)
if e, a := "MultipartUpload", aerr.Code(); e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if e, a := "TotalPartsExceeded", aerr.OrigErr().(awserr.Error).Code(); e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
if !strings.Contains(aerr.Error(), "configured MaxUploadParts (2)") {
t.Errorf("Expected error to contain 'configured MaxUploadParts (2)', but receievd %q", aerr.Error())
}
}
func TestUploadOrderSingleBufferedReader(t *testing.T) {
s, ops, _ := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s)
resp, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: &sizedReader{size: 1024 * 1024 * 2},
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
if e, a := []string{"PutObject"}, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, but received %v", e, a)
}
if len(resp.Location) == 0 {
t.Error("Expected a value in Location but received empty string")
}
if len(resp.UploadID) > 0 {
t.Errorf("Expected empty string but received %q", resp.UploadID)
}
}
func TestUploadZeroLenObject(t *testing.T) {
requestMade := false
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestMade = true
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
mgr := s3manager.NewUploaderWithClient(s3.New(unit.Session, &aws.Config{
Endpoint: aws.String(server.URL),
}))
resp, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: strings.NewReader(""),
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
if !requestMade {
t.Error("Expected request to have been made, but was not")
}
if len(resp.Location) == 0 {
t.Error("Expected a non-empty string value for Location")
}
if len(resp.UploadID) > 0 {
t.Errorf("Expected empty string, but received %q", resp.UploadID)
}
}
func TestUploadInputS3PutObjectInputPairity(t *testing.T) {
matchings := compareStructType(reflect.TypeOf(s3.PutObjectInput{}),
reflect.TypeOf(s3manager.UploadInput{}))
aOnly := []string{}
bOnly := []string{}
for k, c := range matchings {
if c == 1 && k != "ContentLength" {
aOnly = append(aOnly, k)
} else if c == 2 {
bOnly = append(bOnly, k)
}
}
if len(aOnly) > 0 {
t.Errorf("Expected empty array, but received %v", aOnly)
}
if len(bOnly) > 0 {
t.Errorf("Expected empty array, but received %v", bOnly)
}
}
type testIncompleteReader struct {
Size int64
read int64
}
func (r *testIncompleteReader) Read(p []byte) (n int, err error) {
r.read += int64(len(p))
if r.read >= r.Size {
return int(r.read - r.Size), io.ErrUnexpectedEOF
}
return len(p), nil
}
func TestUploadUnexpectedEOF(t *testing.T) {
s, ops, _ := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {
u.Concurrency = 1
u.PartSize = s3manager.MinUploadPartSize
})
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: &testIncompleteReader{
Size: int64(s3manager.MinUploadPartSize + 1),
},
})
if err == nil {
t.Error("Expected error, but received none")
}
// Ensure upload started.
if e, a := "CreateMultipartUpload", (*ops)[0]; e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
// Part may or may not be sent because of timing of sending parts and
// reading next part in upload manager. Just check for the last abort.
if e, a := "AbortMultipartUpload", (*ops)[len(*ops)-1]; e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
}
func compareStructType(a, b reflect.Type) map[string]int {
if a.Kind() != reflect.Struct || b.Kind() != reflect.Struct {
panic(fmt.Sprintf("types must both be structs, got %v and %v", a.Kind(), b.Kind()))
}
aFields := enumFields(a)
bFields := enumFields(b)
matchings := map[string]int{}
for i := 0; i < len(aFields) || i < len(bFields); i++ {
if i < len(aFields) {
c := matchings[aFields[i].Name]
matchings[aFields[i].Name] = c + 1
}
if i < len(bFields) {
c := matchings[bFields[i].Name]
matchings[bFields[i].Name] = c + 2
}
}
return matchings
}
func enumFields(v reflect.Type) []reflect.StructField {
fields := []reflect.StructField{}
for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
// Ignoreing anon fields
if field.PkgPath != "" {
// Ignore unexported fields
continue
}
fields = append(fields, field)
}
return fields
}
type fooReaderAt struct{}
func (r *fooReaderAt) Read(p []byte) (n int, err error) {
return 12, io.EOF
}
func (r *fooReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
return 12, io.EOF
}
func TestReaderAt(t *testing.T) {
svc := s3.New(unit.Session)
svc.Handlers.Unmarshal.Clear()
svc.Handlers.UnmarshalMeta.Clear()
svc.Handlers.UnmarshalError.Clear()
svc.Handlers.Send.Clear()
contentLen := ""
svc.Handlers.Send.PushBack(func(r *request.Request) {
contentLen = r.HTTPRequest.Header.Get("Content-Length")
r.HTTPResponse = &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}
})
mgr := s3manager.NewUploaderWithClient(svc, func(u *s3manager.Uploader) {
u.Concurrency = 1
})
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: &fooReaderAt{},
})
if err != nil {
t.Errorf("Expected no error but received %v", err)
}
if e, a := "12", contentLen; e != a {
t.Errorf("Expected %q, but received %q", e, a)
}
}
func TestSSE(t *testing.T) {
svc := s3.New(unit.Session)
svc.Handlers.Unmarshal.Clear()
svc.Handlers.UnmarshalMeta.Clear()
svc.Handlers.UnmarshalError.Clear()
svc.Handlers.ValidateResponse.Clear()
svc.Handlers.Send.Clear()
partNum := 0
mutex := &sync.Mutex{}
svc.Handlers.Send.PushBack(func(r *request.Request) {
mutex.Lock()
defer mutex.Unlock()
r.HTTPResponse = &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader([]byte(respMsg))),
}
switch data := r.Data.(type) {
case *s3.CreateMultipartUploadOutput:
data.UploadId = aws.String("UPLOAD-ID")
case *s3.UploadPartOutput:
input := r.Params.(*s3.UploadPartInput)
if input.SSECustomerAlgorithm == nil {
t.Fatal("SSECustomerAlgoritm should not be nil")
}
if input.SSECustomerKey == nil {
t.Fatal("SSECustomerKey should not be nil")
}
partNum++
data.ETag = aws.String(fmt.Sprintf("ETAG%d", partNum))
case *s3.CompleteMultipartUploadOutput:
data.Location = aws.String("https://location")
data.VersionId = aws.String("VERSION-ID")
case *s3.PutObjectOutput:
data.VersionId = aws.String("VERSION-ID")
}
})
mgr := s3manager.NewUploaderWithClient(svc, func(u *s3manager.Uploader) {
u.Concurrency = 5
})
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
SSECustomerAlgorithm: aws.String("AES256"),
SSECustomerKey: aws.String("foo"),
Body: bytes.NewBuffer(make([]byte, 1024*1024*10)),
})
if err != nil {
t.Fatal("Expected no error, but received" + err.Error())
}
}
func TestUploadWithContextCanceled(t *testing.T) {
u := s3manager.NewUploader(unit.Session)
params := s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: bytes.NewReader(make([]byte, 0)),
}
ctx := &awstesting.FakeContext{DoneCh: make(chan struct{})}
ctx.Error = fmt.Errorf("context canceled")
close(ctx.DoneCh)
_, err := u.UploadWithContext(ctx, ¶ms)
if err == nil {
t.Fatalf("expected error, did not get one")
}
aerr := err.(awserr.Error)
if e, a := request.CanceledErrorCode, aerr.Code(); e != a {
t.Errorf("expected error code %q, got %q", e, a)
}
if e, a := "canceled", aerr.Message(); !strings.Contains(a, e) {
t.Errorf("expected error message to contain %q, but did not %q", e, a)
}
}
// S3 Uploader incorrectly fails an upload if the content being uploaded
// has a size of MinPartSize * MaxUploadParts.
// Github: aws/aws-sdk-go#2557
func TestUploadMaxPartsEOF(t *testing.T) {
s, ops, _ := loggingSvc(emptyList)
mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {
u.Concurrency = 1
u.PartSize = s3manager.DefaultUploadPartSize
u.MaxUploadParts = 2
})
f := bytes.NewReader(make([]byte, int(mgr.PartSize)*mgr.MaxUploadParts))
r1 := io.NewSectionReader(f, 0, s3manager.DefaultUploadPartSize)
r2 := io.NewSectionReader(f, s3manager.DefaultUploadPartSize, 2*s3manager.DefaultUploadPartSize)
body := io.MultiReader(r1, r2)
_, err := mgr.Upload(&s3manager.UploadInput{
Bucket: aws.String("Bucket"),
Key: aws.String("Key"),
Body: body,
})
if err != nil {
t.Fatalf("expect no error, got %v", err)
}
expectOps := []string{
"CreateMultipartUpload",
"UploadPart",
"UploadPart",
"CompleteMultipartUpload",
}
if e, a := expectOps, *ops; !reflect.DeepEqual(e, a) {
t.Errorf("expect %v ops, got %v", e, a)
}
}
func createTempFile(t *testing.T, size int64) (*os.File, func(*testing.T), error) {
file, err := ioutil.TempFile(os.TempDir(), aws.SDKName+t.Name())
if err != nil {
return nil, nil, err
}
filename := file.Name()
if err := file.Truncate(size); err != nil {
return nil, nil, err
}
return file,
func(t *testing.T) {
if err := file.Close(); err != nil {
t.Errorf("failed to close temp file, %s, %v", filename, err)
}
if err := os.Remove(filename); err != nil {
t.Errorf("failed to remove temp file, %s, %v", filename, err)
}
},
nil
}
func buildFailHandlers(tb testing.TB, parts, retry int) []http.Handler {
handlers := make([]http.Handler, parts)
for i := 0; i < len(handlers); i++ {
handlers[i] = &failPartHandler{
tb: tb,
failsRemaining: retry,
successHandler: successPartHandler{tb: tb},
}
}
return handlers
}
func TestUploadRetry(t *testing.T) {
const numParts, retries = 3, 10
testFile, testFileCleanup, err := createTempFile(t, s3manager.DefaultUploadPartSize*numParts)
if err != nil {
t.Fatalf("failed to create test file, %v", err)
}
defer testFileCleanup(t)
cases := map[string]struct {
Body io.Reader
PartHandlers func(testing.TB) []http.Handler
}{
"bytes.Buffer": {
Body: bytes.NewBuffer(make([]byte, s3manager.DefaultUploadPartSize*numParts)),
PartHandlers: func(tb testing.TB) []http.Handler {
return buildFailHandlers(tb, numParts, retries)
},
},
"bytes.Reader": {
Body: bytes.NewReader(make([]byte, s3manager.DefaultUploadPartSize*numParts)),
PartHandlers: func(tb testing.TB) []http.Handler {
return buildFailHandlers(tb, numParts, retries)
},
},
"os.File": {
Body: testFile,
PartHandlers: func(tb testing.TB) []http.Handler {
return buildFailHandlers(tb, numParts, retries)
},
},
}
for name, c := range cases {
t.Run(name, func(t *testing.T) {
mux := newMockS3UploadServer(t, c.PartHandlers(t))
server := httptest.NewServer(mux)
defer server.Close()
var logger aws.Logger
var logLevel *aws.LogLevelType
if v := os.Getenv("DEBUG_BODY"); len(v) != 0 {
logger = t
logLevel = aws.LogLevel(
aws.LogDebugWithRequestErrors | aws.LogDebugWithRequestRetries,
)
}
sess := unit.Session.Copy(&aws.Config{
Endpoint: aws.String(server.URL),
S3ForcePathStyle: aws.Bool(true),
DisableSSL: aws.Bool(true),
MaxRetries: aws.Int(retries + 1),
SleepDelay: func(time.Duration) {},
Logger: logger,
LogLevel: logLevel,
//Credentials: credentials.AnonymousCredentials,
})
uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
// u.Concurrency = 1
})
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("key"),
Body: c.Body,
})
if err != nil {
t.Fatalf("expect no error, got %v", err)
}
})
}
}
func TestUploadBufferStrategy(t *testing.T) {
cases := map[string]struct {
PartSize int64
Size int64
Strategy s3manager.ReadSeekerWriteToProvider
callbacks int
}{
"NoBuffer": {
PartSize: s3manager.DefaultUploadPartSize,
Strategy: nil,
},
"SinglePart": {
PartSize: s3manager.DefaultUploadPartSize,
Size: s3manager.DefaultUploadPartSize,
Strategy: &recordedBufferProvider{size: int(s3manager.DefaultUploadPartSize)},
callbacks: 1,
},
"MultiPart": {
PartSize: s3manager.DefaultUploadPartSize,
Size: s3manager.DefaultUploadPartSize * 2,
Strategy: &recordedBufferProvider{size: int(s3manager.DefaultUploadPartSize)},
callbacks: 2,
},
}
for name, tCase := range cases {
t.Run(name, func(t *testing.T) {
_ = tCase
sess := unit.Session.Copy()
svc := s3.New(sess)
svc.Handlers.Unmarshal.Clear()
svc.Handlers.UnmarshalMeta.Clear()
svc.Handlers.UnmarshalError.Clear()
svc.Handlers.Send.Clear()
svc.Handlers.Send.PushBack(func(r *request.Request) {
if r.Body != nil {
io.Copy(ioutil.Discard, r.Body)
}
r.HTTPResponse = &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader([]byte(respMsg))),
}
switch data := r.Data.(type) {
case *s3.CreateMultipartUploadOutput:
data.UploadId = aws.String("UPLOAD-ID")
case *s3.UploadPartOutput:
data.ETag = aws.String(fmt.Sprintf("ETAG%d", random.Int()))
case *s3.CompleteMultipartUploadOutput:
data.Location = aws.String("https://location")
data.VersionId = aws.String("VERSION-ID")
case *s3.PutObjectOutput:
data.VersionId = aws.String("VERSION-ID")
}
})
uploader := s3manager.NewUploaderWithClient(svc, func(u *s3manager.Uploader) {
u.PartSize = tCase.PartSize
u.BufferProvider = tCase.Strategy
u.Concurrency = 1
})
expected := s3testing.GetTestBytes(int(tCase.Size))
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("key"),
Body: bytes.NewReader(expected),
})
if err != nil {
t.Fatalf("failed to upload file: %v", err)
}
switch strat := tCase.Strategy.(type) {
case *recordedBufferProvider:
if !bytes.Equal(expected, strat.content) {
t.Errorf("content buffered did not match expected")
}
if tCase.callbacks != strat.callbackCount {
t.Errorf("expected %v, got %v callbacks", tCase.callbacks, strat.callbackCount)
}
}
})
}
}
func TestUploaderValidARN(t *testing.T) {
cases := map[string]struct {
input s3manager.UploadInput
wantErr bool
}{
"standard bucket": {
input: s3manager.UploadInput{
Bucket: aws.String("test-bucket"),
Key: aws.String("test-key"),
Body: bytes.NewReader([]byte("test body content")),
},
},
"accesspoint": {
input: s3manager.UploadInput{
Bucket: aws.String("arn:aws:s3:us-west-2:123456789012:accesspoint/myap"),
Key: aws.String("test-key"),
Body: bytes.NewReader([]byte("test body content")),
},
},
"outpost accesspoint": {
input: s3manager.UploadInput{
Bucket: aws.String("arn:aws:s3-outposts:us-west-2:012345678901:outpost/op-1234567890123456/accesspoint/myaccesspoint"),
Key: aws.String("test-key"),
Body: bytes.NewReader([]byte("test body content")),
},
},
"s3-object-lambda accesspoint": {
input: s3manager.UploadInput{
Bucket: aws.String("arn:aws:s3-object-lambda:us-west-2:123456789012:accesspoint/myap"),
Key: aws.String("test-key"),
Body: bytes.NewReader([]byte("test body content")),
},
wantErr: true,
},
}
for name, tt := range cases {
t.Run(name, func(t *testing.T) {
client, _, _ := loggingSvc(nil)
client.Config.Region = aws.String("us-west-2")
client.ClientInfo.SigningRegion = "us-west-2"
uploader := s3manager.NewUploaderWithClient(client)
_, err := uploader.Upload(&tt.input)
if (err != nil) != tt.wantErr {
t.Errorf("err: %v, wantErr: %v", err, tt.wantErr)
}
})
}
}
type mockS3UploadServer struct {
*http.ServeMux
tb testing.TB
partHandler []http.Handler
}
func newMockS3UploadServer(tb testing.TB, partHandler []http.Handler) *mockS3UploadServer {
s := &mockS3UploadServer{
ServeMux: http.NewServeMux(),
partHandler: partHandler,
tb: tb,
}
s.HandleFunc("/", s.handleRequest)
return s
}
func (s mockS3UploadServer) handleRequest(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
_, hasUploads := r.URL.Query()["uploads"]
switch {
case r.Method == "POST" && hasUploads:
// CreateMultipartUpload
w.Header().Set("Content-Length", strconv.Itoa(len(createUploadResp)))
w.Write([]byte(createUploadResp))
case r.Method == "PUT":
// UploadPart
partNumStr := r.URL.Query().Get("partNumber")
id, err := strconv.Atoi(partNumStr)
if err != nil {
failRequest(w, 400, "BadRequest",
fmt.Sprintf("unable to parse partNumber, %q, %v",
partNumStr, err))
return
}
id--
if id < 0 || id >= len(s.partHandler) {
failRequest(w, 400, "BadRequest",
fmt.Sprintf("invalid partNumber %v", id))
return
}
s.partHandler[id].ServeHTTP(w, r)
case r.Method == "POST":
// CompleteMultipartUpload
w.Header().Set("Content-Length", strconv.Itoa(len(completeUploadResp)))
w.Write([]byte(completeUploadResp))
case r.Method == "DELETE":
// AbortMultipartUpload
w.Header().Set("Content-Length", strconv.Itoa(len(abortUploadResp)))
w.WriteHeader(200)
w.Write([]byte(abortUploadResp))
default:
failRequest(w, 400, "BadRequest",
fmt.Sprintf("invalid request %v %v", r.Method, r.URL))
}
}
func failRequest(w http.ResponseWriter, status int, code, msg string) {
msg = fmt.Sprintf(baseRequestErrorResp, code, msg)
w.Header().Set("Content-Length", strconv.Itoa(len(msg)))
w.WriteHeader(status)
w.Write([]byte(msg))
}
type successPartHandler struct {
tb testing.TB
}
func (h successPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
n, err := io.Copy(ioutil.Discard, r.Body)
if err != nil {
failRequest(w, 400, "BadRequest",
fmt.Sprintf("failed to read body, %v", err))
return
}
contLenStr := r.Header.Get("Content-Length")
expectLen, err := strconv.ParseInt(contLenStr, 10, 64)
if err != nil {
h.tb.Logf("expect content-length, got %q, %v", contLenStr, err)
failRequest(w, 400, "BadRequest",
fmt.Sprintf("unable to get content-length %v", err))
return
}
if e, a := expectLen, n; e != a {
h.tb.Logf("expect %v read, got %v", e, a)
failRequest(w, 400, "BadRequest",
fmt.Sprintf(
"content-length and body do not match, %v, %v", e, a))
return
}
w.Header().Set("Content-Length", strconv.Itoa(len(uploadPartResp)))
w.Write([]byte(uploadPartResp))
}
type failPartHandler struct {
tb testing.TB
failsRemaining int
successHandler http.Handler
}
func (h *failPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if h.failsRemaining == 0 && h.successHandler != nil {
h.successHandler.ServeHTTP(w, r)
return
}
io.Copy(ioutil.Discard, r.Body)
failRequest(w, 500, "InternalException",
fmt.Sprintf("mock error, partNumber %v", r.URL.Query().Get("partNumber")))
h.failsRemaining--
}
type recordedBufferProvider struct {
content []byte
size int
callbackCount int
}
func (r *recordedBufferProvider) GetWriteTo(seeker io.ReadSeeker) (s3manager.ReadSeekerWriteTo, func()) {
b := make([]byte, r.size)
w := &s3manager.BufferedReadSeekerWriteTo{BufferedReadSeeker: s3manager.NewBufferedReadSeeker(seeker, b)}
return w, func() {
r.content = append(r.content, b...)
r.callbackCount++
}
}
const createUploadResp = `
bucket
key
abc123
`
const uploadPartResp = `
key
`
const baseRequestErrorResp = `
%s
%s
request-id
host-id
`
const completeUploadResp = `
bucket
key
key
https://bucket.us-west-2.amazonaws.com/key
abc123
`
const abortUploadResp = `
`