buildkit/cache/estargz.go

246 lines
6.4 KiB
Go

package cache
import (
"archive/tar"
"context"
"fmt"
"io"
"strconv"
"sync"
cdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/stargz-snapshotter/estargz"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
var eStargzAnnotations = []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation}
// compressEStargz writes the passed blobs stream as an eStargz-compressed blob.
// finalize function finalizes the written blob metadata and returns all eStargz annotations.
func compressEStargz() (compressorFunc compressor, finalize func(context.Context, content.Store) (map[string]string, error)) {
var cInfo *compressionInfo
var writeErr error
var mu sync.Mutex
return func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
if compression.FromMediaType(requiredMediaType) != compression.Gzip {
return nil, fmt.Errorf("unsupported media type for estargz compressor %q", requiredMediaType)
}
done := make(chan struct{})
pr, pw := io.Pipe()
go func() (retErr error) {
defer close(done)
defer func() {
if retErr != nil {
mu.Lock()
writeErr = retErr
mu.Unlock()
}
}()
blobInfoW, bInfoCh := calculateBlobInfo()
defer blobInfoW.Close()
w := estargz.NewWriter(io.MultiWriter(dest, blobInfoW))
// Using lossless API here to make sure that decompressEStargz provides the exact
// same tar as the original.
//
// Note that we don't support eStragz compression for tar that contains a file named
// `stargz.index.json` because we cannot create eStargz in loseless way for such blob
// (we must overwrite stargz.index.json file).
if err := w.AppendTarLossLess(pr); err != nil {
pr.CloseWithError(err)
return err
}
tocDgst, err := w.Close()
if err != nil {
pr.CloseWithError(err)
return err
}
if err := blobInfoW.Close(); err != nil {
pr.CloseWithError(err)
return err
}
bInfo := <-bInfoCh
mu.Lock()
cInfo = &compressionInfo{bInfo, tocDgst}
mu.Unlock()
pr.Close()
return nil
}()
return &writeCloser{pw, func() error {
<-done // wait until the write completes
return nil
}}, nil
}, func(ctx context.Context, cs content.Store) (map[string]string, error) {
mu.Lock()
cInfo, writeErr := cInfo, writeErr
mu.Unlock()
if cInfo == nil {
if writeErr != nil {
return nil, errors.Wrapf(writeErr, "cannot finalize due to write error")
}
return nil, errors.Errorf("cannot finalize (reason unknown)")
}
// Fill necessary labels
info, err := cs.Info(ctx, cInfo.compressedDigest)
if err != nil {
return nil, errors.Wrap(err, "failed to get info from content store")
}
if info.Labels == nil {
info.Labels = make(map[string]string)
}
info.Labels[containerdUncompressed] = cInfo.uncompressedDigest.String()
if _, err := cs.Update(ctx, info, "labels."+containerdUncompressed); err != nil {
return nil, err
}
// Fill annotations
a := make(map[string]string)
a[estargz.TOCJSONDigestAnnotation] = cInfo.tocDigest.String()
a[estargz.StoreUncompressedSizeAnnotation] = fmt.Sprintf("%d", cInfo.uncompressedSize)
a[containerdUncompressed] = cInfo.uncompressedDigest.String()
return a, nil
}
}
const estargzLabel = "buildkit.io/compression/estargz"
// isEStargz returns true when the specified digest of content exists in
// the content store and it's eStargz.
func isEStargz(ctx context.Context, cs content.Store, dgst digest.Digest) (bool, error) {
info, err := cs.Info(ctx, dgst)
if err != nil {
return false, nil
}
if isEsgzStr, ok := info.Labels[estargzLabel]; ok {
if isEsgz, err := strconv.ParseBool(isEsgzStr); err == nil {
return isEsgz, nil
}
}
res := func() bool {
r, err := cs.ReaderAt(ctx, ocispecs.Descriptor{Digest: dgst})
if err != nil {
return false
}
defer r.Close()
sr := io.NewSectionReader(r, 0, r.Size())
// Does this have the footer?
tocOffset, _, err := estargz.OpenFooter(sr)
if err != nil {
return false
}
// Is TOC the final entry?
decompressor := new(estargz.GzipDecompressor)
rr, err := decompressor.Reader(io.NewSectionReader(sr, tocOffset, sr.Size()-tocOffset))
if err != nil {
return false
}
tr := tar.NewReader(rr)
h, err := tr.Next()
if err != nil {
return false
}
if h.Name != estargz.TOCTarName {
return false
}
if _, err = tr.Next(); err != io.EOF { // must be EOF
return false
}
return true
}()
if info.Labels == nil {
info.Labels = make(map[string]string)
}
info.Labels[estargzLabel] = strconv.FormatBool(res) // cache the result
if _, err := cs.Update(ctx, info, "labels."+estargzLabel); err != nil {
return false, err
}
return res, nil
}
func decompressEStargz(r *io.SectionReader) (io.ReadCloser, error) {
return estargz.Unpack(r, new(estargz.GzipDecompressor))
}
type writeCloser struct {
io.WriteCloser
closeFunc func() error
}
func (wc *writeCloser) Close() error {
err1 := wc.WriteCloser.Close()
err2 := wc.closeFunc()
if err1 != nil {
return errors.Wrapf(err1, "failed to close: %v", err2)
}
return err2
}
type counter struct {
n int64
mu sync.Mutex
}
func (c *counter) Write(p []byte) (n int, err error) {
c.mu.Lock()
c.n += int64(len(p))
c.mu.Unlock()
return len(p), nil
}
func (c *counter) size() (n int64) {
c.mu.Lock()
n = c.n
c.mu.Unlock()
return
}
type compressionInfo struct {
blobInfo
tocDigest digest.Digest
}
type blobInfo struct {
compressedDigest digest.Digest
uncompressedDigest digest.Digest
uncompressedSize int64
}
func calculateBlobInfo() (io.WriteCloser, chan blobInfo) {
res := make(chan blobInfo)
pr, pw := io.Pipe()
go func() {
defer pr.Close()
c := new(counter)
dgstr := digest.Canonical.Digester()
diffID := digest.Canonical.Digester()
decompressR, err := cdcompression.DecompressStream(io.TeeReader(pr, dgstr.Hash()))
if err != nil {
pr.CloseWithError(err)
return
}
defer decompressR.Close()
if _, err := io.Copy(io.MultiWriter(c, diffID.Hash()), decompressR); err != nil {
pr.CloseWithError(err)
return
}
if err := decompressR.Close(); err != nil {
pr.CloseWithError(err)
return
}
res <- blobInfo{dgstr.Digest(), diffID.Digest(), c.size()}
}()
return pw, res
}