150 lines
3.4 KiB
Go
150 lines
3.4 KiB
Go
|
package contentutil
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"io/ioutil"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/containerd/containerd/content"
|
||
|
"github.com/containerd/containerd/errdefs"
|
||
|
digest "github.com/opencontainers/go-digest"
|
||
|
"github.com/pkg/errors"
|
||
|
)
|
||
|
|
||
|
// Buffer is a content provider and ingester that keeps data in memory
|
||
|
type Buffer interface {
|
||
|
content.Provider
|
||
|
content.Ingester
|
||
|
}
|
||
|
|
||
|
// NewBuffer returns a new buffer
|
||
|
func NewBuffer() Buffer {
|
||
|
return &buffer{
|
||
|
buffers: map[digest.Digest][]byte{},
|
||
|
refs: map[string]struct{}{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type buffer struct {
|
||
|
mu sync.Mutex
|
||
|
buffers map[digest.Digest][]byte
|
||
|
refs map[string]struct{}
|
||
|
}
|
||
|
|
||
|
func (b *buffer) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
|
||
|
b.mu.Lock()
|
||
|
if _, ok := b.refs[ref]; ok {
|
||
|
return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %s locked", ref)
|
||
|
}
|
||
|
b.mu.Unlock()
|
||
|
return &bufferedWriter{
|
||
|
main: b,
|
||
|
digester: digest.Canonical.Digester(),
|
||
|
buffer: bytes.NewBuffer(nil),
|
||
|
expected: expected,
|
||
|
releaseRef: func() {
|
||
|
b.mu.Lock()
|
||
|
delete(b.refs, ref)
|
||
|
b.mu.Unlock()
|
||
|
},
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func (b *buffer) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
|
||
|
r, err := b.getBytesReader(ctx, dgst)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return &readerAt{Reader: r, Closer: ioutil.NopCloser(r), size: int64(r.Len())}, nil
|
||
|
}
|
||
|
|
||
|
func (b *buffer) getBytesReader(ctx context.Context, dgst digest.Digest) (*bytes.Reader, error) {
|
||
|
b.mu.Lock()
|
||
|
defer b.mu.Unlock()
|
||
|
|
||
|
if dt, ok := b.buffers[dgst]; ok {
|
||
|
return bytes.NewReader(dt), nil
|
||
|
}
|
||
|
|
||
|
return nil, errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
|
||
|
}
|
||
|
|
||
|
func (b *buffer) addValue(k digest.Digest, dt []byte) {
|
||
|
b.mu.Lock()
|
||
|
defer b.mu.Unlock()
|
||
|
b.buffers[k] = dt
|
||
|
}
|
||
|
|
||
|
type bufferedWriter struct {
|
||
|
main *buffer
|
||
|
ref string
|
||
|
offset int64
|
||
|
total int64
|
||
|
startedAt time.Time
|
||
|
updatedAt time.Time
|
||
|
buffer *bytes.Buffer
|
||
|
expected digest.Digest
|
||
|
digester digest.Digester
|
||
|
releaseRef func()
|
||
|
}
|
||
|
|
||
|
func (w *bufferedWriter) Write(p []byte) (n int, err error) {
|
||
|
n, err = w.buffer.Write(p)
|
||
|
w.digester.Hash().Write(p[:n])
|
||
|
w.offset += int64(len(p))
|
||
|
w.updatedAt = time.Now()
|
||
|
return n, err
|
||
|
}
|
||
|
|
||
|
func (w *bufferedWriter) Close() error {
|
||
|
if w.buffer != nil {
|
||
|
w.releaseRef()
|
||
|
w.buffer = nil
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (w *bufferedWriter) Status() (content.Status, error) {
|
||
|
return content.Status{
|
||
|
Ref: w.ref,
|
||
|
Offset: w.offset,
|
||
|
Total: w.total,
|
||
|
StartedAt: w.startedAt,
|
||
|
UpdatedAt: w.updatedAt,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func (w *bufferedWriter) Digest() digest.Digest {
|
||
|
return w.digester.Digest()
|
||
|
}
|
||
|
|
||
|
func (w *bufferedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opt ...content.Opt) error {
|
||
|
if w.buffer == nil {
|
||
|
return errors.Errorf("can't commit already committed or closed")
|
||
|
}
|
||
|
if s := int64(w.buffer.Len()); size > 0 && size != s {
|
||
|
return errors.Errorf("unexpected commit size %d, expected %d", s, size)
|
||
|
}
|
||
|
dgst := w.digester.Digest()
|
||
|
if expected != "" && expected != dgst {
|
||
|
return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
|
||
|
}
|
||
|
if w.expected != "" && w.expected != dgst {
|
||
|
return errors.Errorf("unexpected digest: %v != %v", dgst, w.expected)
|
||
|
}
|
||
|
w.main.addValue(dgst, w.buffer.Bytes())
|
||
|
return w.Close()
|
||
|
}
|
||
|
|
||
|
func (w *bufferedWriter) Truncate(size int64) error {
|
||
|
if size != 0 {
|
||
|
return errors.New("Truncate: unsupported size")
|
||
|
}
|
||
|
w.offset = 0
|
||
|
w.digester.Hash().Reset()
|
||
|
w.buffer.Reset()
|
||
|
return nil
|
||
|
}
|