Skip to content

Commit

Permalink
Merge pull request #363 from 88labs/enhancement/go-module-update
Browse files Browse the repository at this point in the history
go module update
  • Loading branch information
tomtwinkle authored Nov 22, 2023
2 parents 972ba3b + 8e07e64 commit c01fcd5
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 109 deletions.
2 changes: 1 addition & 1 deletion aws/awss3/awss3.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func SelectCSVAll(ctx context.Context, region awsconfig.Region, bucketName Bucke
},
},
}
if c.SkipByteSize > 0 {
if c.SkipByteSize != nil {
req.ScanRange = &types.ScanRange{Start: c.SkipByteSize}
}
resp, err := client.SelectObjectContent(ctx, req)
Expand Down
12 changes: 6 additions & 6 deletions aws/awss3/awss3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestHeadObject(t *testing.T) {
key := createFixture(100)
res, err := awss3.HeadObject(ctx, TestRegion, TestBucket, key)
assert.NoError(t, err)
assert.Equal(t, int64(100), res.ContentLength)
assert.Equal(t, aws.Int64(100), res.ContentLength)
})
t.Run("not exists object", func(t *testing.T) {
t.Parallel()
Expand All @@ -83,7 +83,7 @@ func TestHeadObject(t *testing.T) {
s3head.WithTimeout(5*time.Second),
)
assert.NoError(t, err)
assert.Equal(t, int64(100), res.ContentLength)
assert.Equal(t, aws.Int64(100), res.ContentLength)
})
t.Run("not exists object use Waiter", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -781,7 +781,7 @@ func TestSelectCSVAll(t *testing.T) {
key := createFixture(ctx, src)
var buf bytes.Buffer
err := awss3.SelectCSVAll(ctx, TestRegion, TestBucket, key, awss3.SelectCSVAllQuery, &buf,
s3selectcsv.WithCSVInput(types.CSVInput{AllowQuotedRecordDelimiter: true}),
s3selectcsv.WithCSVInput(types.CSVInput{AllowQuotedRecordDelimiter: aws.Bool(true)}),
)
if !assert.NoError(t, err) {
return
Expand All @@ -803,7 +803,7 @@ func TestSelectCSVAll(t *testing.T) {
key := createFixture(ctx, src)
var buf bytes.Buffer
err := awss3.SelectCSVAll(ctx, TestRegion, TestBucket, key, awss3.SelectCSVAllQuery, &buf,
s3selectcsv.WithCSVInput(types.CSVInput{AllowQuotedRecordDelimiter: true}),
s3selectcsv.WithCSVInput(types.CSVInput{AllowQuotedRecordDelimiter: aws.Bool(true)}),
)
if !assert.NoError(t, err) {
return
Expand All @@ -825,7 +825,7 @@ func TestSelectCSVAll(t *testing.T) {
key := createFixture(ctx, src)
var buf bytes.Buffer
err := awss3.SelectCSVAll(ctx, TestRegion, TestBucket, key, awss3.SelectCSVAllQuery, &buf,
s3selectcsv.WithCSVInput(types.CSVInput{AllowQuotedRecordDelimiter: true}),
s3selectcsv.WithCSVInput(types.CSVInput{AllowQuotedRecordDelimiter: aws.Bool(true)}),
)
if !assert.NoError(t, err) {
return
Expand All @@ -847,7 +847,7 @@ func TestSelectCSVAll(t *testing.T) {
key := createFixture(ctx, src)
var buf bytes.Buffer
err := awss3.SelectCSVAll(ctx, TestRegion, TestBucket, key, awss3.SelectCSVAllQuery, &buf,
s3selectcsv.WithCSVInput(types.CSVInput{AllowQuotedRecordDelimiter: true}),
s3selectcsv.WithCSVInput(types.CSVInput{AllowQuotedRecordDelimiter: aws.Bool(true)}),
)

if !assert.NoError(t, err) {
Expand Down
90 changes: 73 additions & 17 deletions aws/awss3/client.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,48 @@
package awss3

import (
"bytes"
"context"
"encoding/gob"
"fmt"
"net"
"net/url"
"sync"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
smithyendpoints "github.com/aws/smithy-go/endpoints"

"github.com/88labs/go-utils/aws/awsconfig"
"github.com/88labs/go-utils/aws/awss3/options/global/s3dialer"
"github.com/88labs/go-utils/aws/ctxawslocal"
)

// GlobalDialer Global http dialer settings for awss3 library
var GlobalDialer *s3dialer.ConfGlobalDialer
var (
// GlobalDialer Global http dialer settings for awss3 library
GlobalDialer *s3dialer.ConfGlobalDialer

customMu sync.Mutex
customEndpoint string
customEndpointClient *s3.Client
)

// GetClient
// Get s3 client for aws-sdk-go v2.
// Using ctxawslocal.WithContext, you can make requests for local mocks
func GetClient(ctx context.Context, region awsconfig.Region) (*s3.Client, error) {
if localProfile, ok := getLocalEndpoint(ctx); ok {
return getClientLocal(ctx, *localProfile)
customMu.Lock()
defer customMu.Unlock()
var err error
if customEndpointClient != nil {
return customEndpointClient, err
}
customEndpointClient, err = getClientLocal(ctx, *localProfile)
return customEndpointClient, err
}
awsHttpClient := awshttp.NewBuildableClient()
if GlobalDialer != nil {
Expand Down Expand Up @@ -52,20 +70,47 @@ func GetClient(ctx context.Context, region awsconfig.Region) (*s3.Client, error)
return s3.NewFromConfig(awsCfg), nil
}

type staticResolver struct{}

// ResolveEndpoint
// Local test mocks endpoints to connect to minio
//
// FIXME: EndpointResolverWithOptionsFunc substitutes staticResolver for endpoint mock
func (*staticResolver) ResolveEndpoint(_ context.Context, p s3.EndpointParameters) (
smithyendpoints.Endpoint, error,
) {
if customEndpoint != "" {
endpoint, err := url.Parse(customEndpoint)
if err != nil {
return smithyendpoints.Endpoint{}, fmt.Errorf("unable to parse endpoint, %w", err)
}
if p.Bucket != nil {
endpoint = endpoint.JoinPath(*p.Bucket)
}
// This value will be used as-is when making the request.
return smithyendpoints.Endpoint{
URI: *endpoint,
}, nil
}
return smithyendpoints.Endpoint{}, &aws.EndpointNotFoundError{}
}

func getClientLocal(ctx context.Context, localProfile LocalProfile) (*s3.Client, error) {
// FIXME: EndpointResolverWithOptionsFunc substitutes staticResolver for endpoint mock
// because HostnameImmutable is not enabled. (github.com/aws/aws-sdk-go-v2/config v1.25.4)
// https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/endpoints/
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if service == s3.ServiceID {
return aws.Endpoint{
PartitionID: "aws",
URL: localProfile.Endpoint,
SigningRegion: region,
HostnameImmutable: true,
}, nil
}
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
})
//customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
// if service == s3.ServiceID {
// return aws.Endpoint{
// PartitionID: "aws",
// URL: localProfile.Endpoint,
// SigningRegion: region,
// HostnameImmutable: true,
// }, nil
// }
// // returning EndpointNotFoundError will allow the service to fallback to it's default resolution
// return aws.Endpoint{}, &aws.EndpointNotFoundError{}
//})
awsHttpClient := awshttp.NewBuildableClient()
if GlobalDialer != nil {
awsHttpClient.WithDialerOptions(func(dialer *net.Dialer) {
Expand All @@ -82,7 +127,7 @@ func getClientLocal(ctx context.Context, localProfile LocalProfile) (*s3.Client,
}
awsCfg, err := awsConfig.LoadDefaultConfig(ctx,
awsConfig.WithHTTPClient(awsHttpClient),
awsConfig.WithEndpointResolverWithOptions(customResolver),
//awsConfig.WithEndpointResolverWithOptions(customResolver),
awsConfig.WithCredentialsProvider(credentials.StaticCredentialsProvider{
Value: aws.Credentials{
AccessKeyID: localProfile.AccessKey,
Expand All @@ -94,7 +139,10 @@ func getClientLocal(ctx context.Context, localProfile LocalProfile) (*s3.Client,
if err != nil {
return nil, fmt.Errorf("unable to load SDK config, %w", err)
}
return s3.NewFromConfig(awsCfg), nil
customEndpoint = localProfile.Endpoint
return s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.EndpointResolverV2 = &staticResolver{}
}), nil
}

type LocalProfile struct {
Expand All @@ -115,3 +163,11 @@ func getLocalEndpoint(ctx context.Context) (*LocalProfile, bool) {
}
return nil, false
}

func hash(v LocalProfile) ([]byte, error) {
var b bytes.Buffer
if err := gob.NewEncoder(&b).Encode(v); err != nil {
return nil, err
}
return b.Bytes(), nil
}
2 changes: 1 addition & 1 deletion aws/awss3/options/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ func TestGlobalOptionWithHeadObject(t *testing.T) {
awss3.GlobalDialer = dialer
res, err := awss3.HeadObject(ctx, TestRegion, TestBucket, key)
assert.NoError(t, err)
assert.Equal(t, int64(100), res.ContentLength)
assert.Equal(t, aws.Int64(100), res.ContentLength)
})
}
5 changes: 3 additions & 2 deletions aws/awss3/options/s3selectcsv/s3_select_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type OptionS3SelectCSV interface {
}

type confS3SelectCSV struct {
SkipByteSize int64
SkipByteSize *int64
CSVInput types.CSVInput
CompressionType types.CompressionType
CSVOutput types.CSVOutput
Expand All @@ -28,7 +28,8 @@ func GetS3SelectCSVConf(opts ...OptionS3SelectCSV) confS3SelectCSV {
type OptionSkipByteSize int64

func (o OptionSkipByteSize) Apply(c *confS3SelectCSV) {
c.SkipByteSize = int64(o)
v := int64(o)
c.SkipByteSize = &v
}

// WithSkipByteSize
Expand Down
2 changes: 1 addition & 1 deletion aws/awssqs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func GetClient(ctx context.Context, region awsconfig.Region) (*sqs.Client, error
if localProfile, ok := getLocalEndpoint(ctx); ok {
return getClientLocal(ctx, *localProfile)
}
// S3 Client
// SQS Client
awsCfg, err := awsConfig.LoadDefaultConfig(ctx, awsConfig.WithRegion(region.String()))
if err != nil {
return nil, fmt.Errorf("unable to load SDK config, %w", err)
Expand Down
2 changes: 2 additions & 0 deletions aws/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ services:
/bin/sh -c "
/usr/bin/mc config host add s3 http://minio:29000 DUMMYACCESSKEYEXAMPLE DUMMYSECRETKEYEXAMPLE;
/usr/bin/mc --quiet mb s3/test;
/usr/bin/mc --quiet anonymous set upload s3/test;
/usr/bin/mc --quiet anonymous set download s3/test;
/usr/bin/mc --quiet policy set-json /policies/policy_test.json s3/test;
exit 0;
"
Expand Down
52 changes: 26 additions & 26 deletions aws/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ go 1.21
require (
github.com/88labs/go-utils/ulid v0.3.0
github.com/88labs/go-utils/utf8bom v0.4.0
github.com/aws/aws-sdk-go-v2 v1.22.2
github.com/aws/aws-sdk-go-v2/config v1.22.3
github.com/aws/aws-sdk-go-v2/credentials v1.15.2
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.12.1
github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.6.1
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.13.4
github.com/aws/aws-sdk-go-v2/service/cognitoidentity v1.20.1
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.25.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.42.1
github.com/aws/aws-sdk-go-v2/service/sqs v1.28.0
github.com/aws/smithy-go v1.16.0
github.com/aws/aws-sdk-go-v2 v1.23.1
github.com/aws/aws-sdk-go-v2/config v1.25.4
github.com/aws/aws-sdk-go-v2/credentials v1.16.3
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.12.3
github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.6.3
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.14.1
github.com/aws/aws-sdk-go-v2/service/cognitoidentity v1.20.3
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.25.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.43.1
github.com/aws/aws-sdk-go-v2/service/sqs v1.28.2
github.com/aws/smithy-go v1.17.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/go-faker/faker/v4 v4.2.0
github.com/stretchr/testify v1.8.4
Expand All @@ -25,21 +25,21 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.5.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.2 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.17.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.8.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.17.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.19.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.25.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.4 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.17.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.8.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.17.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.25.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
Loading

0 comments on commit c01fcd5

Please sign in to comment.