buildkit/exporter/local/export.go

148 lines
3.0 KiB
Go

package local
import (
"context"
"io/ioutil"
"os"
"strings"
"time"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)
type Opt struct {
SessionManager *session.Manager
}
type localExporter struct {
opt Opt
// session manager
}
func New(opt Opt) (exporter.Exporter, error) {
le := &localExporter{opt: opt}
return le, nil
}
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
li := &localExporterInstance{localExporter: e, caller: caller}
return li, nil
}
type localExporterInstance struct {
*localExporter
caller session.Caller
}
func (e *localExporterInstance) Name() string {
return "exporting to client"
}
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source) (map[string]string, error) {
isMap := len(inp.Refs) > 0
export := func(ctx context.Context, k string, ref cache.ImmutableRef) func() error {
return func() error {
var src string
var err error
if ref == nil {
src, err = ioutil.TempDir("", "buildkit")
if err != nil {
return err
}
defer os.RemoveAll(src)
} else {
mount, err := ref.Mount(ctx, true)
if err != nil {
return err
}
lm := snapshot.LocalMounter(mount)
src, err = lm.Mount()
if err != nil {
return err
}
defer lm.Unmount()
}
fs := fsutil.NewFS(src, nil)
lbl := "copying files"
if isMap {
lbl += " " + k
fs = fsutil.SubDirFS(fs, fsutil.Stat{
Mode: uint32(os.ModeDir | 0755),
Path: strings.Replace(k, "/", "_", -1),
})
}
progress := newProgressHandler(ctx, lbl)
if err := filesync.CopyToCaller(ctx, fs, e.caller, progress); err != nil {
return err
}
return nil
}
}
eg, ctx := errgroup.WithContext(ctx)
if isMap {
for k, ref := range inp.Refs {
eg.Go(export(ctx, k, ref))
}
} else {
eg.Go(export(ctx, "", inp.Ref))
}
if err := eg.Wait(); err != nil {
return nil, err
}
return nil, nil
}
func newProgressHandler(ctx context.Context, id string) func(int, bool) {
limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
Action: "transferring",
}
pw.Write(id, st)
return func(s int, last bool) {
if last || limiter.Allow() {
st.Current = s
if last {
now := time.Now()
st.Completed = &now
}
pw.Write(id, st)
if last {
pw.Close()
}
}
}
}