diff --git a/go.mod b/go.mod index 9adf138f..de5ff95f 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Microsoft/go-winio v0.5.1 github.com/Microsoft/hcsshim v0.9.1 github.com/agext/levenshtein v1.2.3 + github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 github.com/containerd/console v1.0.3 github.com/containerd/containerd v1.6.0-beta.3 github.com/containerd/containerd/api v1.6.0-beta.3 diff --git a/go.sum b/go.sum index 94ab03f6..b65f8fe1 100644 --- a/go.sum +++ b/go.sum @@ -170,6 +170,8 @@ github.com/apex/logs v0.0.4/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDw github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE= github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= +github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 h1:7Ip0wMmLHLRJdrloDxZfhMm0xrLXZS8+COSu2bXmEQs= +github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index 7605e1b2..12534712 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -330,9 +330,14 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu } meta.Env = addDefaultEnvvar(meta.Env, "PATH", utilsystem.DefaultPathEnv(currentOS)) - stdout, stderr := logs.NewLogStreams(ctx, os.Getenv("BUILDKIT_DEBUG_EXEC_OUTPUT") == "1") + stdout, stderr, flush := logs.NewLogStreams(ctx, os.Getenv("BUILDKIT_DEBUG_EXEC_OUTPUT") == "1") defer stdout.Close() defer stderr.Close() + defer func() { + if err != nil { + flush() + } + }() execErr := e.exec.Run(ctx, "", p.Root, p.Mounts, executor.ProcessInfo{ Meta: meta, diff --git a/source/git/gitsource.go b/source/git/gitsource.go index 7b4ec59e..0ddb95a7 100644 --- a/source/git/gitsource.go +++ b/source/git/gitsource.go @@ -631,11 +631,16 @@ func getGitSSHCommand(knownHosts string) string { return gitSSHCommand } -func git(ctx context.Context, dir, sshAuthSock, knownHosts string, args ...string) (*bytes.Buffer, error) { +func git(ctx context.Context, dir, sshAuthSock, knownHosts string, args ...string) (_ *bytes.Buffer, err error) { for { - stdout, stderr := logs.NewLogStreams(ctx, false) + stdout, stderr, flush := logs.NewLogStreams(ctx, false) defer stdout.Close() defer stderr.Close() + defer func() { + if err != nil { + flush() + } + }() cmd := exec.Command("git", args...) cmd.Dir = dir // some commands like submodule require this buf := bytes.NewBuffer(nil) diff --git a/util/progress/logs/logs.go b/util/progress/logs/logs.go index 07c7bd89..a5c831c8 100644 --- a/util/progress/logs/logs.go +++ b/util/progress/logs/logs.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/armon/circbuf" "github.com/moby/buildkit/client" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/util/progress" @@ -27,11 +28,16 @@ const ( var configCheckOnce sync.Once -func NewLogStreams(ctx context.Context, printOutput bool) (io.WriteCloser, io.WriteCloser) { - return newStreamWriter(ctx, stdout, printOutput), newStreamWriter(ctx, stderr, printOutput) +func NewLogStreams(ctx context.Context, printOutput bool) (io.WriteCloser, io.WriteCloser, func()) { + stdout := newStreamWriter(ctx, stdout, printOutput) + stderr := newStreamWriter(ctx, stderr, printOutput) + return stdout, stderr, func() { + stdout.flushBuffer() + stderr.flushBuffer() + } } -func newStreamWriter(ctx context.Context, stream int, printOutput bool) io.WriteCloser { +func newStreamWriter(ctx context.Context, stream int, printOutput bool) *streamWriter { pw, _, _ := progress.NewFromContext(ctx) return &streamWriter{ pw: pw, @@ -49,6 +55,7 @@ type streamWriter struct { size int clipping bool clipReasonSpeed bool + buf *circbuf.Buffer } func (sw *streamWriter) checkLimit(n int) int { @@ -97,7 +104,19 @@ func (sw *streamWriter) clipLimitMessage() string { func (sw *streamWriter) Write(dt []byte) (int, error) { oldSize := len(dt) - dt = append([]byte{}, dt[:sw.checkLimit(len(dt))]...) + limit := sw.checkLimit(len(dt)) + if sw.buf == nil && limit < len(dt) { + var err error + sw.buf, err = circbuf.NewBuffer(256 * 1024) + if err != nil { + return 0, err + } + } + if sw.buf != nil { + sw.buf.Write(dt) + } + + dt = append([]byte{}, dt[:limit]...) if sw.clipping && oldSize == len(dt) { sw.clipping = false @@ -107,25 +126,42 @@ func (sw *streamWriter) Write(dt []byte) (int, error) { sw.clipping = true } - if len(dt) != 0 { - sw.pw.Write(identity.NewID(), client.VertexLog{ - Stream: sw.stream, - Data: dt, - }) - if sw.printOutput { - switch sw.stream { - case 1: - return os.Stdout.Write(dt) - case 2: - return os.Stderr.Write(dt) - default: - return 0, errors.Errorf("invalid stream %d", sw.stream) - } - } + _, err := sw.write(dt) + if err != nil { + return 0, err } return oldSize, nil } +func (sw *streamWriter) write(dt []byte) (int, error) { + if len(dt) == 0 { + return 0, nil + } + sw.pw.Write(identity.NewID(), client.VertexLog{ + Stream: sw.stream, + Data: dt, + }) + if sw.printOutput { + switch sw.stream { + case 1: + return os.Stdout.Write(dt) + case 2: + return os.Stderr.Write(dt) + default: + return 0, errors.Errorf("invalid stream %d", sw.stream) + } + } + return len(dt), nil +} + +func (sw *streamWriter) flushBuffer() { + if sw.buf == nil { + return + } + _, _ = sw.write(sw.buf.Bytes()) + sw.buf = nil +} + func (sw *streamWriter) Close() error { return sw.pw.Close() } diff --git a/vendor/github.com/armon/circbuf/.gitignore b/vendor/github.com/armon/circbuf/.gitignore new file mode 100644 index 00000000..00268614 --- /dev/null +++ b/vendor/github.com/armon/circbuf/.gitignore @@ -0,0 +1,22 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe diff --git a/vendor/github.com/armon/circbuf/LICENSE b/vendor/github.com/armon/circbuf/LICENSE new file mode 100644 index 00000000..106569e5 --- /dev/null +++ b/vendor/github.com/armon/circbuf/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013 Armon Dadgar + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/armon/circbuf/README.md b/vendor/github.com/armon/circbuf/README.md new file mode 100644 index 00000000..f2e356b8 --- /dev/null +++ b/vendor/github.com/armon/circbuf/README.md @@ -0,0 +1,28 @@ +circbuf +======= + +This repository provides the `circbuf` package. This provides a `Buffer` object +which is a circular (or ring) buffer. It has a fixed size, but can be written +to infinitely. Only the last `size` bytes are ever retained. The buffer implements +the `io.Writer` interface. + +Documentation +============= + +Full documentation can be found on [Godoc](http://godoc.org/github.com/armon/circbuf) + +Usage +===== + +The `circbuf` package is very easy to use: + +```go +buf, _ := NewBuffer(6) +buf.Write([]byte("hello world")) + +if string(buf.Bytes()) != " world" { + panic("should only have last 6 bytes!") +} + +``` + diff --git a/vendor/github.com/armon/circbuf/circbuf.go b/vendor/github.com/armon/circbuf/circbuf.go new file mode 100644 index 00000000..de3cb94a --- /dev/null +++ b/vendor/github.com/armon/circbuf/circbuf.go @@ -0,0 +1,92 @@ +package circbuf + +import ( + "fmt" +) + +// Buffer implements a circular buffer. It is a fixed size, +// and new writes overwrite older data, such that for a buffer +// of size N, for any amount of writes, only the last N bytes +// are retained. +type Buffer struct { + data []byte + size int64 + writeCursor int64 + written int64 +} + +// NewBuffer creates a new buffer of a given size. The size +// must be greater than 0. +func NewBuffer(size int64) (*Buffer, error) { + if size <= 0 { + return nil, fmt.Errorf("Size must be positive") + } + + b := &Buffer{ + size: size, + data: make([]byte, size), + } + return b, nil +} + +// Write writes up to len(buf) bytes to the internal ring, +// overriding older data if necessary. +func (b *Buffer) Write(buf []byte) (int, error) { + // Account for total bytes written + n := len(buf) + b.written += int64(n) + + // If the buffer is larger than ours, then we only care + // about the last size bytes anyways + if int64(n) > b.size { + buf = buf[int64(n)-b.size:] + } + + // Copy in place + remain := b.size - b.writeCursor + copy(b.data[b.writeCursor:], buf) + if int64(len(buf)) > remain { + copy(b.data, buf[remain:]) + } + + // Update location of the cursor + b.writeCursor = ((b.writeCursor + int64(len(buf))) % b.size) + return n, nil +} + +// Size returns the size of the buffer +func (b *Buffer) Size() int64 { + return b.size +} + +// TotalWritten provides the total number of bytes written +func (b *Buffer) TotalWritten() int64 { + return b.written +} + +// Bytes provides a slice of the bytes written. This +// slice should not be written to. +func (b *Buffer) Bytes() []byte { + switch { + case b.written >= b.size && b.writeCursor == 0: + return b.data + case b.written > b.size: + out := make([]byte, b.size) + copy(out, b.data[b.writeCursor:]) + copy(out[b.size-b.writeCursor:], b.data[:b.writeCursor]) + return out + default: + return b.data[:b.writeCursor] + } +} + +// Reset resets the buffer so it has no content. +func (b *Buffer) Reset() { + b.writeCursor = 0 + b.written = 0 +} + +// String returns the contents of the buffer as a string +func (b *Buffer) String() string { + return string(b.Bytes()) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 57dff499..5c0bf87a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -39,6 +39,9 @@ github.com/Microsoft/hcsshim/pkg/ociwclayer # github.com/agext/levenshtein v1.2.3 ## explicit github.com/agext/levenshtein +# github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 +## explicit +github.com/armon/circbuf # github.com/beorn7/perks v1.0.1 ## explicit; go 1.11 github.com/beorn7/perks/quantile