diff --git a/solver/source.go b/solver/source.go index 647deeb4..df8348c9 100644 --- a/solver/source.go +++ b/solver/source.go @@ -23,7 +23,11 @@ func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) if err != nil { return nil, err } - ref, err := s.sm.Pull(ctx, id) + src, err := s.sm.Resolve(ctx, id) + if err != nil { + return nil, err + } + ref, err := src.Snapshot(ctx) if err != nil { return nil, err } diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index e6a1b493..a13744f9 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -62,29 +62,63 @@ func (is *imageSource) ID() string { return source.DockerImageScheme } -func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.ImmutableRef, error) { - // TODO: update this to always centralize layer downloads/unpacks - // TODO: progress status +type puller struct { + is *imageSource + resolveOnce sync.Once + src *source.ImageIdentifier + desc ocispec.Descriptor + ref string + resolveErr error +} +func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) { imageIdentifier, ok := id.(*source.ImageIdentifier) if !ok { return nil, errors.New("invalid identifier") } - resolveProgressDone := oneOffProgress(ctx, "resolve "+imageIdentifier.Reference.String()) - ref, desc, err := is.resolver.Resolve(ctx, imageIdentifier.Reference.String()) - if err != nil { - return nil, resolveProgressDone(err) + p := &puller{ + src: imageIdentifier, + is: is, } - resolveProgressDone(nil) + return p, nil +} - ongoing := newJobs(ref) +func (p *puller) resolve(ctx context.Context) error { + p.resolveOnce.Do(func() { + resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String()) + ref, desc, err := p.is.resolver.Resolve(ctx, p.src.Reference.String()) + if err != nil { + p.resolveErr = err + resolveProgressDone(err) + return + } + p.desc = desc + p.ref = ref + resolveProgressDone(nil) + }) + return p.resolveErr +} + +func (p *puller) CacheKey(ctx context.Context) ([]string, error) { + if err := p.resolve(ctx); err != nil { + return nil, err + } + return []string{p.desc.Digest.String()}, nil +} + +func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { + if err := p.resolve(ctx); err != nil { + return nil, err + } + + ongoing := newJobs(p.ref) pctx, stopProgress := context.WithCancel(ctx) - go showProgress(pctx, ongoing, is.ContentStore) + go showProgress(pctx, ongoing, p.is.ContentStore) - fetcher, err := is.resolver.Fetcher(ctx, ref) + fetcher, err := p.is.resolver.Fetcher(ctx, p.ref) if err != nil { stopProgress() return nil, err @@ -98,23 +132,23 @@ func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.Im ongoing.add(desc) return nil, nil }), - remotes.FetchHandler(is.ContentStore, fetcher), - images.ChildrenHandler(is.ContentStore), + remotes.FetchHandler(p.is.ContentStore, fetcher), + images.ChildrenHandler(p.is.ContentStore), } - if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil { + if err := images.Dispatch(ctx, images.Handlers(handlers...), p.desc); err != nil { stopProgress() return nil, err } stopProgress() - unpackProgressDone := oneOffProgress(ctx, "unpacking "+imageIdentifier.Reference.String()) - chainid, err := is.unpack(ctx, desc) + unpackProgressDone := oneOffProgress(ctx, "unpacking "+p.src.Reference.String()) + chainid, err := p.is.unpack(ctx, p.desc) if err != nil { return nil, unpackProgressDone(err) } unpackProgressDone(nil) - return is.CacheAccessor.Get(ctx, chainid) + return p.is.CacheAccessor.Get(ctx, chainid) } func (is *imageSource) unpack(ctx context.Context, desc ocispec.Descriptor) (string, error) { diff --git a/source/manager.go b/source/manager.go index b7c9f67f..6f41afa8 100644 --- a/source/manager.go +++ b/source/manager.go @@ -10,18 +10,13 @@ import ( type Source interface { ID() string - Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error) + Resolve(ctx context.Context, id Identifier) (SourceInstance, error) } -// type Source interface { -// ID() string -// Resolve(ctx context.Context, id Identifier) (SourceInstance, error) -// } -// -// type SourceInstance interface { -// GetCacheKey(ctx context.Context) ([]string, error) -// GetSnapshot(ctx context.Context) (cache.ImmutableRef, error) -// } +type SourceInstance interface { + CacheKey(ctx context.Context) ([]string, error) + Snapshot(ctx context.Context) (cache.ImmutableRef, error) +} type Manager struct { mu sync.Mutex @@ -40,7 +35,7 @@ func (sm *Manager) Register(src Source) { sm.mu.Unlock() } -func (sm *Manager) Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error) { +func (sm *Manager) Resolve(ctx context.Context, id Identifier) (SourceInstance, error) { sm.mu.Lock() src, ok := sm.sources[id.ID()] sm.mu.Unlock() @@ -49,5 +44,5 @@ func (sm *Manager) Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, return nil, errors.Errorf("no handler fro %s", id.ID()) } - return src.Pull(ctx, id) + return src.Resolve(ctx, id) }