Skip to content

Commit

Permalink
Merge pull request #109 from depot/feat/load-attach-proxy
Browse files Browse the repository at this point in the history
feat(load): use docker attach to proxy docker pull
  • Loading branch information
goller authored Apr 29, 2023
2 parents 24f8482 + f4d0829 commit f215e04
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 109 deletions.
3 changes: 2 additions & 1 deletion pkg/buildx/commands/bake.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ func RunBake(dockerCli command.Cli, targets []string, in BakeOptions) (err error
}(i)
}

if err := eg.Wait(); err != nil {
err := eg.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
// For now, we will fallback by rebuilding with load.
if in.exportLoad {
progress.Write(printer, "[load] fast load failed; retrying", func() error { return err })
Expand Down
2 changes: 1 addition & 1 deletion pkg/buildx/commands/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func buildTargets(ctx context.Context, dockerCli command.Cli, nodes []builder.No

// NOTE: the err is returned at the end of this function after the final prints.
err = load.DepotFastLoad(ctx, dockerCli.Client(), resp, pullOpts, printer)
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
// For now, we will fallback by rebuilding with load.
if exportLoad {
// We can only retry if neither the context nor dockerfile are stdin.
Expand Down
22 changes: 22 additions & 0 deletions pkg/load/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package load

import (
"math/rand"
"time"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

// During a download of an image we temporarily store the image with this
// random name to avoid conflicts with any other images.
func RandImageName() string {
const letterBytes = "abcdefghijklmnopqrstuvwxyz"
name := make([]byte, 10)
for i := range name {
name[i] = letterBytes[rand.Intn(len(letterBytes))]
}

return string(name)
}
122 changes: 28 additions & 94 deletions pkg/load/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"runtime"
"strings"
"time"
Expand Down Expand Up @@ -52,10 +49,13 @@ func DepotFastLoad(ctx context.Context, dockerapi docker.APIClient, resp []depot
ProxyImage: pullOpt.ProxyImage,
}

// Start the depot CLI hosted registry and socat proxy.
var registry LocalRegistryProxy
// Start the depot registry proxy.
var registry *RegistryProxy
err = progress.Wrap("preparing to load", pw.Write, func(logger progress.SubLogger) error {
registry, err = NewLocalRegistryProxy(ctx, proxyOpts, dockerapi, contentClient, logger)
registry, err = NewRegistryProxy(ctx, proxyOpts, dockerapi, contentClient, logger)
if err != nil {
err = logger.Wrap(fmt.Sprintf("[registry] unable to start %s", err), func() error { return err })
}
return err
})
if err != nil {
Expand Down Expand Up @@ -179,7 +179,7 @@ func contentClient(ctx context.Context, nodeResponse depotbuild.DepotNodeRespons
return client.ContentClient(), nil
}

type LocalRegistryProxy struct {
type RegistryProxy struct {
// ImageToPull is the image that should be pulled.
ImageToPull string
// ProxyContainerID is the ID of the container that is proxying the registry.
Expand All @@ -193,67 +193,48 @@ type LocalRegistryProxy struct {
DockerAPI docker.APIClient
}

// NewLocalRegistryProxy creates a local registry proxy that can be used to pull images from
// NewRegistryProxy creates a registry proxy that can be used to pull images from
// buildkitd cache.
//
// This also handles docker for desktop issues that prevent the registry from being accessed directly
// by running a proxy container with socat forwarding to the running server.
// This also handles docker for desktop issues that prevent the registry from being
// accessed directly because the proxy is accessible by the docker daemon.
// The proxy registry translates pull requests into a custom protocol over
// stdin and stdout. We use this proprietary protocol as the Docker daemon itself
// my be remote and the only way to communicate with remote daemons is over `attach`.
//
// The running server and proxy container will be cleaned-up when Close() is called.
func NewLocalRegistryProxy(ctx context.Context, opts *ProxyOpts, dockerapi docker.APIClient, contentClient contentapi.ContentClient, logger progress.SubLogger) (LocalRegistryProxy, error) {
registryHandler := NewRegistry(contentClient, opts.RawConfig, opts.RawManifest, logger)

func NewRegistryProxy(ctx context.Context, opts *ProxyOpts, dockerapi docker.APIClient, contentClient contentapi.ContentClient, logger progress.SubLogger) (*RegistryProxy, error) {
ctx, cancel := context.WithCancel(ctx)
registryPort, err := serveRegistry(ctx, registryHandler)
if err != nil {
cancel()
return LocalRegistryProxy{}, err
}

proxyContainerID, proxyExposedPort, err := RunProxyImage(ctx, dockerapi, opts.ProxyImage, registryPort)
proxyContainer, err := RunProxyImage(ctx, dockerapi, opts.ProxyImage, opts.RawManifest, opts.RawConfig)
if err != nil {
cancel()
return LocalRegistryProxy{}, err
return nil, err
}

// Wait for the registry and the proxy to be ready.
dockerAccessibleHost := fmt.Sprintf("localhost:%s", proxyExposedPort)

maxWait := time.NewTimer(20 * time.Second)
var ready bool
for !ready {
ready = IsReady(ctx, dockerAccessibleHost)
if ready {
break
}

select {
case <-ctx.Done():
case <-time.After(100 * time.Millisecond):
case <-maxWait.C:
cancel()
return LocalRegistryProxy{}, errors.New("timed out waiting for registry to be ready")
}
}
transport := NewTransport(proxyContainer.Conn)
go func() {
// Canceling ctx will stop the transport.
_ = transport.Run(ctx, contentClient)
}()

randomImageName := RandImageName()
// The tag is only for the UX during a pull. The first line will be "pulling manifest".
tag := "manifest"
// Docker is able to pull from the proxyPort on localhost. The socat proxy
// forwards to the registry server running on the registryPort.
imageToPull := fmt.Sprintf("localhost:%s/%s:%s", proxyExposedPort, randomImageName, tag)
// Docker is able to pull from the proxyPort on localhost. The proxy
// forwards registry requests to the Transport over docker attach's stdin and stdout.
imageToPull := fmt.Sprintf("localhost:%s/%s:%s", proxyContainer.Port, randomImageName, tag)

return LocalRegistryProxy{
return &RegistryProxy{
ImageToPull: imageToPull,
ProxyContainerID: proxyContainerID,
ProxyContainerID: proxyContainer.ID,
Cancel: cancel,
DockerAPI: dockerapi,
}, nil
}

// Close will stop the registry server and remove the proxy container if it was created.
func (l *LocalRegistryProxy) Close(ctx context.Context) error {
l.Cancel()
func (l *RegistryProxy) Close(ctx context.Context) error {
l.Cancel() // This stops the serial transport.
return StopProxyContainer(ctx, l.DockerAPI, l.ProxyContainerID)
}

Expand Down Expand Up @@ -283,50 +264,3 @@ func chooseBestImageManifest(architecture string, index *ocispecs.Index) (ocispe

return ocispecs.Descriptor{}, errors.New("no manifests found")
}

// The registry can pull images from buildkitd's content store.
// Cancel the context to stop the registry.
func serveRegistry(ctx context.Context, registry *Registry) (int, error) {
listener, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
return 0, err
}

server := &http.Server{
Handler: registry,
}

go func() {
<-ctx.Done()
_ = server.Shutdown(ctx)
}()

go func() {
_ = server.Serve(listener)
}()

return listener.Addr().(*net.TCPAddr).Port, nil
}

// During a download of an image we temporarily store the image with this
// random name to avoid conflicts with any other images.
func RandImageName() string {
const letterBytes = "abcdefghijklmnopqrstuvwxyz"
name := make([]byte, 10)
for i := range name {
name[i] = letterBytes[rand.Intn(len(letterBytes))]
}

return string(name)
}

// IsReady checks if the registry is ready to be used.
func IsReady(ctx context.Context, addr string) bool {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://"+addr+"/v2/", nil)
_, err := http.DefaultClient.Do(req)

return err == nil
}
46 changes: 34 additions & 12 deletions pkg/load/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package load

import (
"context"
"encoding/base64"
"fmt"
"io"
"net"
"sync"

"github.com/docker/docker/api/types"
Expand All @@ -13,13 +15,19 @@ import (
"github.com/docker/go-connections/nat"
)

const DefaultProxyImageName = "ghcr.io/depot/helper:1"
const DefaultProxyImageName = "ghcr.io/depot/helper:2.0.0"

type ProxyContainer struct {
ID string
Port string
Conn net.Conn
}

// Runs a proxy container via the docker API so that the docker daemon can pull from the local depot registry.
// This is specifically to handle docker for desktop running in a VM restricting access to the host network.
func RunProxyImage(ctx context.Context, dockerapi docker.APIClient, proxyImage string, registryPort int) (string, string, error) {
func RunProxyImage(ctx context.Context, dockerapi docker.APIClient, proxyImage string, rawManifest, rawConfig []byte) (*ProxyContainer, error) {
if err := PullProxyImage(ctx, dockerapi, proxyImage); err != nil {
return "", "", err
return nil, err
}

resp, err := dockerapi.ContainerCreate(ctx,
Expand All @@ -28,11 +36,15 @@ func RunProxyImage(ctx context.Context, dockerapi docker.APIClient, proxyImage s
ExposedPorts: nat.PortSet{
nat.Port("8888/tcp"): struct{}{},
},
Cmd: []string{
"socat",
"TCP-LISTEN:8888,fork",
fmt.Sprintf("TCP:host.docker.internal:%d", registryPort),
AttachStdin: true,
AttachStdout: true,
OpenStdin: true,
StdinOnce: true,
Env: []string{
fmt.Sprintf("MANIFEST=%s", base64.StdEncoding.EncodeToString(rawManifest)),
fmt.Sprintf("CONFIG=%s", base64.StdEncoding.EncodeToString(rawConfig)),
},
Cmd: []string{"/srv/helper"},
},
&container.HostConfig{
PublishAllPorts: true,
Expand All @@ -46,32 +58,42 @@ func RunProxyImage(ctx context.Context, dockerapi docker.APIClient, proxyImage s
)

if err != nil {
return "", "", err
return nil, err
}

if err := dockerapi.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
return "", "", err
return nil, err
}

inspect, err := dockerapi.ContainerInspect(ctx, resp.ID)
if err != nil {
return "", "", err
return nil, err
}
binds := inspect.NetworkSettings.Ports[nat.Port("8888/tcp")]
var proxyPortOnHost string
for _, bind := range binds {
proxyPortOnHost = bind.HostPort
}

return resp.ID, proxyPortOnHost, nil
attach, err := dockerapi.ContainerAttach(ctx, resp.ID, types.ContainerAttachOptions{Stdin: true, Stdout: true, Logs: true, Stream: true})

if err != nil {
return nil, err
}

return &ProxyContainer{
ID: resp.ID,
Port: proxyPortOnHost,
Conn: attach.Conn,
}, nil
}

var (
downloadedProxyImage sync.Once
downloadProxyImageErr error
)

// PullProxyImage will pull the socat proxy image into docker.
// PullProxyImage will pull the proxy image into docker.
// This is done once per process as a performance optimization.
// Additionally, if the proxy image is already present, this will not pull the image.
func PullProxyImage(ctx context.Context, dockerapi docker.APIClient, imageName string) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/load/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func ImagePullPrivileged(ctx context.Context, dockerapi docker.APIClient, imageN
return nil
}

func printPull(ctx context.Context, rc io.ReadCloser, l progress.SubLogger) error {
func printPull(ctx context.Context, rc io.Reader, l progress.SubLogger) error {
started := map[string]*client.VertexStatus{}

defer func() {
Expand Down
Loading

0 comments on commit f215e04

Please sign in to comment.