exporter: add local exporter

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2017-08-03 15:24:02 -07:00
parent 40d15aa14b
commit 9f6d9a9e78
22 changed files with 446 additions and 107 deletions

8
client/exporters.go Normal file
View File

@ -0,0 +1,8 @@
package client
const (
ExporterImage = "image"
ExporterLocal = "local"
exporterLocalOutputDir = "output"
)

View File

@ -65,6 +65,14 @@ func (c *Client) Solve(ctx context.Context, r io.Reader, opt SolveOpt, statusCha
s.Allow(filesync.NewFSSyncProvider(syncedDirs))
}
if opt.Exporter == ExporterLocal {
outputDir, ok := opt.ExporterAttrs[exporterLocalOutputDir]
if !ok {
return errors.Errorf("output directory is required for local exporter")
}
s.Allow(filesync.NewFSSyncTarget(outputDir))
}
eg.Go(func() error {
return s.Run(ctx, grpchijack.Dialer(c.controlClient()))
})

View File

@ -82,6 +82,8 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
return nil, errors.Wrap(err, "failed to load")
}
ctx = session.NewContext(ctx, req.Session)
var expi exporter.ExporterInstance
if req.Exporter != "" {
exp, ok := c.opt.Exporters[req.Exporter]
@ -94,8 +96,6 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
}
}
ctx = session.NewContext(ctx, req.Session)
if err := c.solver.Solve(ctx, req.Ref, v, expi); err != nil {
return nil, err
}

View File

@ -12,8 +12,10 @@ import (
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/instructioncache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/exporter"
imageexporter "github.com/moby/buildkit/exporter/containerimage"
localexporter "github.com/moby/buildkit/exporter/local"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot/blobmapping"
"github.com/moby/buildkit/source"
@ -22,8 +24,6 @@ import (
"github.com/moby/buildkit/source/local"
)
const keyImageExporter = "image"
type pullDeps struct {
Snapshotter ctdsnapshot.Snapshotter
ContentStore content.Store
@ -114,7 +114,15 @@ func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) {
if err != nil {
return nil, err
}
exporters[keyImageExporter] = imageExporter
exporters[client.ExporterImage] = imageExporter
localExporter, err := localexporter.New(localexporter.Opt{
SessionManager: sessm,
})
if err != nil {
return nil, err
}
exporters[client.ExporterLocal] = localExporter
return &Opt{
Snapshotter: snapshotter,

View File

@ -24,8 +24,7 @@ func main() {
flag.Parse()
bk := buildkit(opt)
out := bk.Run(llb.Shlex("ls -l /bin")) // debug output
out := bk
dt, err := out.Marshal()
if err != nil {
panic(err)
@ -89,17 +88,17 @@ func buildkit(opt buildOpt) llb.State {
buildctl := run(llb.Shlex("go build -o /out/buildctl ./cmd/buildctl"))
r := llb.Image("docker.io/library/alpine:latest").With(
copyAll(buildctl, "/bin"),
copyAll(runc(opt.runc), "/bin"),
r := llb.Scratch().With(
copyAll(buildctl, "/"),
copyAll(runc(opt.runc), "/"),
)
if opt.target == "containerd" {
return r.With(
copyAll(containerd(opt.containerd), "/bin"),
copyAll(builddContainerd, "/bin"))
copyAll(containerd(opt.containerd), "/"),
copyAll(builddContainerd, "/"))
}
return r.With(copyAll(builddStandalone, "/bin"))
return r.With(copyAll(builddStandalone, "/"))
}
func copyAll(src llb.State, destPath string) llb.StateOption {

98
exporter/local/export.go Normal file
View File

@ -0,0 +1,98 @@
package local
import (
"time"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)
type Opt struct {
SessionManager *session.Manager
}
type localExporter struct {
opt Opt
// session manager
}
func New(opt Opt) (exporter.Exporter, error) {
le := &localExporter{opt: opt}
return le, nil
}
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
li := &localExporterInstance{localExporter: e, caller: caller}
return li, nil
}
type localExporterInstance struct {
*localExporter
caller session.Caller
}
func (e *localExporterInstance) Name() string {
return "exporting to client"
}
func (e *localExporterInstance) Export(ctx context.Context, ref cache.ImmutableRef) error {
mount, err := ref.Mount(ctx, true)
if err != nil {
return err
}
lm := snapshot.LocalMounter(mount)
dest, err := lm.Mount()
if err != nil {
return err
}
defer lm.Unmount()
progress := newProgressHandler(ctx, "copying files")
return filesync.CopyToCaller(ctx, dest, e.caller, progress)
}
func newProgressHandler(ctx context.Context, id string) func(int, bool) {
limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
Action: "transferring",
}
pw.Write(id, st)
return func(s int, last bool) {
if last || limiter.Allow() {
st.Current = s
if last {
now := time.Now()
st.Completed = &now
}
pw.Write(id, st)
if last {
pw.Close()
}
}
}
}

View File

@ -1,6 +1,7 @@
package filesync
import (
"os"
"time"
"github.com/sirupsen/logrus"
@ -27,5 +28,27 @@ func recvDiffCopy(ds grpc.Stream, dest string, cu CacheUpdater, progress progres
cf = cu.HandleChange
ch = cu.ContentHasher()
}
return fsutil.Receive(ds.Context(), ds, dest, cf, ch, progress)
return fsutil.Receive(ds.Context(), ds, dest, fsutil.ReceiveOpt{
NotifyHashed: cf,
ContentHasher: ch,
ProgressCb: progress,
})
}
func syncTargetDiffCopy(ds grpc.Stream, dest string) error {
if err := os.MkdirAll(dest, 0700); err != nil {
return err
}
return fsutil.Receive(ds.Context(), ds, dest, fsutil.ReceiveOpt{
Merge: true,
Filter: func() func(*fsutil.Stat) bool {
uid := os.Getuid()
gid := os.Getgid()
return func(st *fsutil.Stat) bool {
st.Uid = uint32(uid)
st.Gid = uint32(gid)
return true
}
}(),
})
}

View File

@ -202,3 +202,39 @@ func FSSync(ctx context.Context, c session.Caller, opt FSSendRequestOpt) error {
return pr.recvFn(stream, opt.DestDir, opt.CacheUpdater, opt.ProgressCb)
}
// NewFSSyncTarget allows writing into a directory
func NewFSSyncTarget(outdir string) session.Attachable {
p := &fsSyncTarget{
outdir: outdir,
}
return p
}
type fsSyncTarget struct {
outdir string
}
func (sp *fsSyncTarget) Register(server *grpc.Server) {
RegisterFileSendServer(server, sp)
}
func (sp *fsSyncTarget) DiffCopy(stream FileSend_DiffCopyServer) error {
return syncTargetDiffCopy(stream, sp.outdir)
}
func CopyToCaller(ctx context.Context, srcPath string, c session.Caller, progress func(int, bool)) error {
method := session.MethodURL(_FileSend_serviceDesc.ServiceName, "diffcopy")
if !c.Supports(method) {
return errors.Errorf("method %s not supported by the client", method)
}
client := NewFileSendClient(c.Conn())
cc, err := client.DiffCopy(ctx)
if err != nil {
return err
}
return sendDiffCopy(cc, srcPath, nil, nil, progress)
}

View File

@ -277,6 +277,102 @@ var _FileSync_serviceDesc = grpc.ServiceDesc{
Metadata: "filesync.proto",
}
// Client API for FileSend service
type FileSendClient interface {
DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSend_DiffCopyClient, error)
}
type fileSendClient struct {
cc *grpc.ClientConn
}
func NewFileSendClient(cc *grpc.ClientConn) FileSendClient {
return &fileSendClient{cc}
}
func (c *fileSendClient) DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSend_DiffCopyClient, error) {
stream, err := grpc.NewClientStream(ctx, &_FileSend_serviceDesc.Streams[0], c.cc, "/moby.filesync.v1.FileSend/DiffCopy", opts...)
if err != nil {
return nil, err
}
x := &fileSendDiffCopyClient{stream}
return x, nil
}
type FileSend_DiffCopyClient interface {
Send(*BytesMessage) error
Recv() (*BytesMessage, error)
grpc.ClientStream
}
type fileSendDiffCopyClient struct {
grpc.ClientStream
}
func (x *fileSendDiffCopyClient) Send(m *BytesMessage) error {
return x.ClientStream.SendMsg(m)
}
func (x *fileSendDiffCopyClient) Recv() (*BytesMessage, error) {
m := new(BytesMessage)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for FileSend service
type FileSendServer interface {
DiffCopy(FileSend_DiffCopyServer) error
}
func RegisterFileSendServer(s *grpc.Server, srv FileSendServer) {
s.RegisterService(&_FileSend_serviceDesc, srv)
}
func _FileSend_DiffCopy_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(FileSendServer).DiffCopy(&fileSendDiffCopyServer{stream})
}
type FileSend_DiffCopyServer interface {
Send(*BytesMessage) error
Recv() (*BytesMessage, error)
grpc.ServerStream
}
type fileSendDiffCopyServer struct {
grpc.ServerStream
}
func (x *fileSendDiffCopyServer) Send(m *BytesMessage) error {
return x.ServerStream.SendMsg(m)
}
func (x *fileSendDiffCopyServer) Recv() (*BytesMessage, error) {
m := new(BytesMessage)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _FileSend_serviceDesc = grpc.ServiceDesc{
ServiceName: "moby.filesync.v1.FileSend",
HandlerType: (*FileSendServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "DiffCopy",
Handler: _FileSend_DiffCopy_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "filesync.proto",
}
func (m *BytesMessage) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -558,7 +654,7 @@ var (
func init() { proto.RegisterFile("filesync.proto", fileDescriptorFilesync) }
var fileDescriptorFilesync = []byte{
// 198 bytes of a gzipped FileDescriptorProto
// 208 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4b, 0xcb, 0xcc, 0x49,
0x2d, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0xc8, 0xcd, 0x4f, 0xaa,
0xd4, 0x83, 0x0b, 0x96, 0x19, 0x2a, 0x29, 0x71, 0xf1, 0x38, 0x55, 0x96, 0xa4, 0x16, 0xfb, 0xa6,
@ -566,10 +662,10 @@ var fileDescriptorFilesync = []byte{
0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x46, 0xab, 0x19, 0xb9, 0x38, 0xdc, 0x32, 0x73, 0x52, 0x83,
0x2b, 0xf3, 0x92, 0x85, 0xfc, 0xb8, 0x38, 0x5c, 0x32, 0xd3, 0xd2, 0x9c, 0xf3, 0x0b, 0x2a, 0x85,
0xe4, 0xf4, 0xd0, 0xcd, 0xd3, 0x43, 0x36, 0x4c, 0x8a, 0x80, 0xbc, 0x06, 0xa3, 0x01, 0xa3, 0x90,
0x3f, 0x17, 0x67, 0x48, 0x62, 0x51, 0x70, 0x49, 0x51, 0x6a, 0x62, 0x2e, 0x35, 0x0c, 0x74, 0x32,
0xbb, 0xf0, 0x50, 0x8e, 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9,
0x31, 0xae, 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e,
0xc9, 0x31, 0xbe, 0x78, 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x51,
0x1c, 0x30, 0xb3, 0x92, 0xd8, 0xc0, 0x41, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x5f, 0x0c,
0x8d, 0xc5, 0x34, 0x01, 0x00, 0x00,
0x3f, 0x17, 0x67, 0x48, 0x62, 0x51, 0x70, 0x49, 0x51, 0x6a, 0x62, 0x2e, 0x35, 0x0c, 0x34, 0x8a,
0x82, 0x3a, 0x36, 0x35, 0x2f, 0x85, 0xda, 0x8e, 0x75, 0x32, 0xbb, 0xf0, 0x50, 0x8e, 0xe1, 0xc6,
0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, 0x31, 0xae, 0x78, 0x24, 0xc7, 0x78,
0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0xbe, 0x78, 0x24, 0xc7,
0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x51, 0x1c, 0x30, 0xb3, 0x92, 0xd8, 0xc0,
0xc1, 0x6f, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x72, 0x81, 0x1a, 0x91, 0x90, 0x01, 0x00, 0x00,
}

View File

@ -9,6 +9,11 @@ service FileSync{
rpc TarStream(stream BytesMessage) returns (stream BytesMessage);
}
service FileSend{
rpc DiffCopy(stream BytesMessage) returns (stream BytesMessage);
}
// BytesMessage contains a chunk of byte data
message BytesMessage{
bytes data = 1;

View File

@ -55,18 +55,3 @@ func (lm *localMounter) Mount() (string, error) {
lm.target = dir
return dir, nil
}
func (lm *localMounter) Unmount() error {
lm.mu.Lock()
defer lm.mu.Unlock()
if lm.target != "" {
if err := mount.Unmount(lm.target, 0); err != nil {
return err
}
os.RemoveAll(lm.target)
lm.target = ""
}
return nil
}

View File

@ -0,0 +1,25 @@
// +build !windows
package snapshot
import (
"os"
"syscall"
"github.com/containerd/containerd/mount"
)
func (lm *localMounter) Unmount() error {
lm.mu.Lock()
defer lm.mu.Unlock()
if lm.target != "" {
if err := mount.Unmount(lm.target, syscall.MNT_DETACH); err != nil {
return err
}
os.RemoveAll(lm.target)
lm.target = ""
}
return nil
}

View File

@ -0,0 +1,22 @@
package snapshot
import (
"os"
"github.com/containerd/containerd/mount"
)
func (lm *localMounter) Unmount() error {
lm.mu.Lock()
defer lm.mu.Unlock()
if lm.target != "" {
if err := mount.Unmount(lm.target, 0); err != nil {
return err
}
os.RemoveAll(lm.target)
lm.target = ""
}
return nil
}

View File

@ -203,9 +203,6 @@ func (s *Solver) walkVertex(ctx context.Context, g *vertex, index Index, fn func
inputCacheKeys := make([][]digest.Digest, len(g.inputs))
walkerStopped := false
inputCtx, cancelInputCtx := context.WithCancel(ctx)
defer cancelInputCtx()
inputRefs := make([]Reference, len(g.inputs))
defer func() {
@ -218,6 +215,8 @@ func (s *Solver) walkVertex(ctx context.Context, g *vertex, index Index, fn func
if len(g.inputs) > 0 {
eg, ctx := errgroup.WithContext(ctx)
inputCtx, cancelInputCtx := context.WithCancel(ctx)
defer cancelInputCtx()
for i, in := range g.inputs {
func(i int, in *input) {
eg.Go(func() error {

View File

@ -36,7 +36,7 @@ github.com/BurntSushi/locker 392720b78f44e9d0249fcac6c43b111b47a370b8
github.com/docker/docker 6301ac0c27aef52a96820482a01869457ae416f7 https://github.com/mcandre/moby.git
github.com/pkg/profile 5b67d428864e92711fcbd2f8629456121a56d91f
github.com/tonistiigi/fsutil 195d62bee906e45aa700b8ebeb3417f7b126bb23
github.com/tonistiigi/fsutil d49833a9a6fa5b41f63e7e338038633d10276b57
github.com/stevvooe/continuity 86cec1535a968310e7532819f699ff2830ed7463
github.com/dmcgowan/go-tar 2e2c51242e8993c50445dab7c03c8e7febddd0cf
github.com/hashicorp/go-immutable-radix 826af9ccf0feeee615d546d69b11f8e98da8c8f1 git://github.com/tonistiigi/go-immutable-radix.git

View File

@ -38,3 +38,7 @@ func GetWalkerFn(root string) walkerFn {
})
}
}
func emptyWalker(ctx context.Context, pathC chan<- *currentPath) error {
return nil
}

View File

@ -1,5 +1,3 @@
// +build linux windows
package fsutil
import (
@ -24,8 +22,11 @@ type DiskWriterOpt struct {
SyncDataCb WriteToFunc
NotifyCb func(ChangeKind, string, os.FileInfo, error) error
ContentHasher ContentHasher
Filter FilterFunc
}
type FilterFunc func(*Stat) bool
type DiskWriter struct {
opt DiskWriterOpt
dest string
@ -34,6 +35,7 @@ type DiskWriter struct {
ctx context.Context
cancel func()
eg *errgroup.Group
filter FilterFunc
}
func NewDiskWriter(ctx context.Context, dest string, opt DiskWriterOpt) (*DiskWriter, error) {
@ -99,6 +101,12 @@ func (dw *DiskWriter) HandleChange(kind ChangeKind, p string, fi os.FileInfo, er
return errors.Errorf("%s invalid change without stat information", p)
}
if dw.filter != nil {
if ok := dw.filter(stat); !ok {
return nil
}
}
rename := true
oldFi, err := os.Lstat(destPath)
if err != nil {

View File

@ -0,0 +1,7 @@
// +build darwin
package fsutil
func chtimes(path string, un int64) error {
return nil
}

View File

@ -3,36 +3,10 @@
package fsutil
import (
"os"
"syscall"
"github.com/pkg/errors"
"github.com/stevvooe/continuity/sysx"
"golang.org/x/sys/unix"
)
func rewriteMetadata(p string, stat *Stat) error {
for key, value := range stat.Xattrs {
sysx.Setxattr(p, key, value, 0)
}
if err := os.Lchown(p, int(stat.Uid), int(stat.Gid)); err != nil {
return errors.Wrapf(err, "failed to lchown %s", p)
}
if os.FileMode(stat.Mode)&os.ModeSymlink == 0 {
if err := os.Chmod(p, os.FileMode(stat.Mode)); err != nil {
return errors.Wrapf(err, "failed to chown %s", p)
}
}
if err := chtimes(p, stat.ModTime); err != nil {
return errors.Wrapf(err, "failed to chtimes %s", p)
}
return nil
}
func chtimes(path string, un int64) error {
var utimes [2]unix.Timespec
utimes[0] = unix.NsecToTimespec(un)
@ -44,21 +18,3 @@ func chtimes(path string, un int64) error {
return nil
}
// handleTarTypeBlockCharFifo is an OS-specific helper function used by
// createTarFile to handle the following types of header: Block; Char; Fifo
func handleTarTypeBlockCharFifo(path string, stat *Stat) error {
mode := uint32(stat.Mode & 07777)
if os.FileMode(stat.Mode)&os.ModeCharDevice != 0 {
mode |= syscall.S_IFCHR
} else if os.FileMode(stat.Mode)&os.ModeNamedPipe != 0 {
mode |= syscall.S_IFIFO
} else {
mode |= syscall.S_IFBLK
}
if err := syscall.Mknod(path, mode, int(mkdev(stat.Devmajor, stat.Devminor))); err != nil {
return err
}
return nil
}

51
vendor/github.com/tonistiigi/fsutil/diskwriter_unix.go generated vendored Normal file
View File

@ -0,0 +1,51 @@
// +build !windows
package fsutil
import (
"os"
"syscall"
"github.com/pkg/errors"
"github.com/stevvooe/continuity/sysx"
)
func rewriteMetadata(p string, stat *Stat) error {
for key, value := range stat.Xattrs {
sysx.Setxattr(p, key, value, 0)
}
if err := os.Lchown(p, int(stat.Uid), int(stat.Gid)); err != nil {
return errors.Wrapf(err, "failed to lchown %s", p)
}
if os.FileMode(stat.Mode)&os.ModeSymlink == 0 {
if err := os.Chmod(p, os.FileMode(stat.Mode)); err != nil {
return errors.Wrapf(err, "failed to chown %s", p)
}
}
if err := chtimes(p, stat.ModTime); err != nil {
return errors.Wrapf(err, "failed to chtimes %s", p)
}
return nil
}
// handleTarTypeBlockCharFifo is an OS-specific helper function used by
// createTarFile to handle the following types of header: Block; Char; Fifo
func handleTarTypeBlockCharFifo(path string, stat *Stat) error {
mode := uint32(stat.Mode & 07777)
if os.FileMode(stat.Mode)&os.ModeCharDevice != 0 {
mode |= syscall.S_IFCHR
} else if os.FileMode(stat.Mode)&os.ModeNamedPipe != 0 {
mode |= syscall.S_IFIFO
} else {
mode |= syscall.S_IFBLK
}
if err := syscall.Mknod(path, mode, int(mkdev(stat.Devmajor, stat.Devminor))); err != nil {
return err
}
return nil
}

View File

@ -1,5 +1,3 @@
// +build linux windows
package fsutil
import (
@ -12,7 +10,15 @@ import (
"golang.org/x/sync/errgroup"
)
func Receive(ctx context.Context, conn Stream, dest string, notifyHashed ChangeFunc, contentHasher ContentHasher, progressCb func(int, bool)) error {
type ReceiveOpt struct {
NotifyHashed ChangeFunc
ContentHasher ContentHasher
ProgressCb func(int, bool)
Merge bool
Filter FilterFunc
}
func Receive(ctx context.Context, conn Stream, dest string, opt ReceiveOpt) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -21,9 +27,11 @@ func Receive(ctx context.Context, conn Stream, dest string, notifyHashed ChangeF
dest: dest,
files: make(map[string]uint32),
pipes: make(map[uint32]io.WriteCloser),
notifyHashed: notifyHashed,
contentHasher: contentHasher,
progressCb: progressCb,
notifyHashed: opt.NotifyHashed,
contentHasher: opt.ContentHasher,
progressCb: opt.ProgressCb,
merge: opt.Merge,
filter: opt.Filter,
}
return r.run(ctx)
}
@ -36,6 +44,8 @@ type receiver struct {
mu sync.RWMutex
muPipes sync.RWMutex
progressCb func(int, bool)
merge bool
filter FilterFunc
notifyHashed ChangeFunc
contentHasher ContentHasher
@ -88,6 +98,7 @@ func (r *receiver) run(ctx context.Context) error {
AsyncDataCb: r.asyncDataFunc,
NotifyCb: r.notifyHashed,
ContentHasher: r.contentHasher,
Filter: r.filter,
})
if err != nil {
return err
@ -96,7 +107,11 @@ func (r *receiver) run(ctx context.Context) error {
w := newDynamicWalker()
g.Go(func() error {
err := doubleWalkDiff(ctx, dw.HandleChange, GetWalkerFn(r.dest), w.fill)
destWalker := emptyWalker
if !r.merge {
destWalker = GetWalkerFn(r.dest)
}
err := doubleWalkDiff(ctx, dw.HandleChange, destWalker, w.fill)
if err != nil {
return err
}

View File

@ -1,14 +0,0 @@
// +build !linux,!windows
package fsutil
import (
"runtime"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
func Receive(ctx context.Context, conn Stream, dest string, notifyHashed ChangeFunc, contentHasher ContentHasher, progressCb func(int, bool)) error {
return errors.Errorf("receive is unsupported in %s", runtime.GOOS)
}