source: update interface for cache calls
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-18.09
parent
9a862b9a28
commit
c9e2493f06
|
@ -23,7 +23,11 @@ func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ref, err := s.sm.Pull(ctx, id)
|
||||
src, err := s.sm.Resolve(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ref, err := src.Snapshot(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -62,29 +62,63 @@ func (is *imageSource) ID() string {
|
|||
return source.DockerImageScheme
|
||||
}
|
||||
|
||||
func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.ImmutableRef, error) {
|
||||
// TODO: update this to always centralize layer downloads/unpacks
|
||||
// TODO: progress status
|
||||
type puller struct {
|
||||
is *imageSource
|
||||
resolveOnce sync.Once
|
||||
src *source.ImageIdentifier
|
||||
desc ocispec.Descriptor
|
||||
ref string
|
||||
resolveErr error
|
||||
}
|
||||
|
||||
func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) {
|
||||
imageIdentifier, ok := id.(*source.ImageIdentifier)
|
||||
if !ok {
|
||||
return nil, errors.New("invalid identifier")
|
||||
}
|
||||
|
||||
resolveProgressDone := oneOffProgress(ctx, "resolve "+imageIdentifier.Reference.String())
|
||||
ref, desc, err := is.resolver.Resolve(ctx, imageIdentifier.Reference.String())
|
||||
if err != nil {
|
||||
return nil, resolveProgressDone(err)
|
||||
p := &puller{
|
||||
src: imageIdentifier,
|
||||
is: is,
|
||||
}
|
||||
resolveProgressDone(nil)
|
||||
return p, nil
|
||||
}
|
||||
|
||||
ongoing := newJobs(ref)
|
||||
func (p *puller) resolve(ctx context.Context) error {
|
||||
p.resolveOnce.Do(func() {
|
||||
resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String())
|
||||
ref, desc, err := p.is.resolver.Resolve(ctx, p.src.Reference.String())
|
||||
if err != nil {
|
||||
p.resolveErr = err
|
||||
resolveProgressDone(err)
|
||||
return
|
||||
}
|
||||
p.desc = desc
|
||||
p.ref = ref
|
||||
resolveProgressDone(nil)
|
||||
})
|
||||
return p.resolveErr
|
||||
}
|
||||
|
||||
func (p *puller) CacheKey(ctx context.Context) ([]string, error) {
|
||||
if err := p.resolve(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []string{p.desc.Digest.String()}, nil
|
||||
}
|
||||
|
||||
func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
|
||||
if err := p.resolve(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ongoing := newJobs(p.ref)
|
||||
|
||||
pctx, stopProgress := context.WithCancel(ctx)
|
||||
|
||||
go showProgress(pctx, ongoing, is.ContentStore)
|
||||
go showProgress(pctx, ongoing, p.is.ContentStore)
|
||||
|
||||
fetcher, err := is.resolver.Fetcher(ctx, ref)
|
||||
fetcher, err := p.is.resolver.Fetcher(ctx, p.ref)
|
||||
if err != nil {
|
||||
stopProgress()
|
||||
return nil, err
|
||||
|
@ -98,23 +132,23 @@ func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.Im
|
|||
ongoing.add(desc)
|
||||
return nil, nil
|
||||
}),
|
||||
remotes.FetchHandler(is.ContentStore, fetcher),
|
||||
images.ChildrenHandler(is.ContentStore),
|
||||
remotes.FetchHandler(p.is.ContentStore, fetcher),
|
||||
images.ChildrenHandler(p.is.ContentStore),
|
||||
}
|
||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
|
||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), p.desc); err != nil {
|
||||
stopProgress()
|
||||
return nil, err
|
||||
}
|
||||
stopProgress()
|
||||
|
||||
unpackProgressDone := oneOffProgress(ctx, "unpacking "+imageIdentifier.Reference.String())
|
||||
chainid, err := is.unpack(ctx, desc)
|
||||
unpackProgressDone := oneOffProgress(ctx, "unpacking "+p.src.Reference.String())
|
||||
chainid, err := p.is.unpack(ctx, p.desc)
|
||||
if err != nil {
|
||||
return nil, unpackProgressDone(err)
|
||||
}
|
||||
unpackProgressDone(nil)
|
||||
|
||||
return is.CacheAccessor.Get(ctx, chainid)
|
||||
return p.is.CacheAccessor.Get(ctx, chainid)
|
||||
}
|
||||
|
||||
func (is *imageSource) unpack(ctx context.Context, desc ocispec.Descriptor) (string, error) {
|
||||
|
|
|
@ -10,18 +10,13 @@ import (
|
|||
|
||||
type Source interface {
|
||||
ID() string
|
||||
Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error)
|
||||
Resolve(ctx context.Context, id Identifier) (SourceInstance, error)
|
||||
}
|
||||
|
||||
// type Source interface {
|
||||
// ID() string
|
||||
// Resolve(ctx context.Context, id Identifier) (SourceInstance, error)
|
||||
// }
|
||||
//
|
||||
// type SourceInstance interface {
|
||||
// GetCacheKey(ctx context.Context) ([]string, error)
|
||||
// GetSnapshot(ctx context.Context) (cache.ImmutableRef, error)
|
||||
// }
|
||||
type SourceInstance interface {
|
||||
CacheKey(ctx context.Context) ([]string, error)
|
||||
Snapshot(ctx context.Context) (cache.ImmutableRef, error)
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
mu sync.Mutex
|
||||
|
@ -40,7 +35,7 @@ func (sm *Manager) Register(src Source) {
|
|||
sm.mu.Unlock()
|
||||
}
|
||||
|
||||
func (sm *Manager) Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error) {
|
||||
func (sm *Manager) Resolve(ctx context.Context, id Identifier) (SourceInstance, error) {
|
||||
sm.mu.Lock()
|
||||
src, ok := sm.sources[id.ID()]
|
||||
sm.mu.Unlock()
|
||||
|
@ -49,5 +44,5 @@ func (sm *Manager) Pull(ctx context.Context, id Identifier) (cache.ImmutableRef,
|
|||
return nil, errors.Errorf("no handler fro %s", id.ID())
|
||||
}
|
||||
|
||||
return src.Pull(ctx, id)
|
||||
return src.Resolve(ctx, id)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue