2017-06-08 00:53:36 +00:00
package control
import (
2018-01-16 22:30:10 +00:00
"context"
2018-08-30 21:06:27 +00:00
"sync"
2019-05-28 23:14:34 +00:00
"sync/atomic"
2018-07-31 20:14:53 +00:00
"time"
2018-01-16 22:30:10 +00:00
2017-06-22 20:15:46 +00:00
controlapi "github.com/moby/buildkit/api/services/control"
2018-06-30 01:35:39 +00:00
apitypes "github.com/moby/buildkit/api/types"
2018-05-07 21:24:28 +00:00
"github.com/moby/buildkit/cache/remotecache"
2017-06-22 20:15:46 +00:00
"github.com/moby/buildkit/client"
2018-07-19 16:34:44 +00:00
controlgateway "github.com/moby/buildkit/control/gateway"
2017-07-10 20:03:38 +00:00
"github.com/moby/buildkit/exporter"
2017-08-25 20:08:18 +00:00
"github.com/moby/buildkit/frontend"
2017-07-11 17:12:12 +00:00
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/grpchijack"
2018-05-11 05:58:41 +00:00
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
2018-06-22 02:06:12 +00:00
"github.com/moby/buildkit/solver/pb"
2019-05-28 23:14:34 +00:00
"github.com/moby/buildkit/util/imageutil"
2018-08-30 21:06:27 +00:00
"github.com/moby/buildkit/util/throttle"
2017-06-22 20:15:46 +00:00
"github.com/moby/buildkit/worker"
2017-06-09 01:16:19 +00:00
"github.com/pkg/errors"
2017-07-19 01:05:19 +00:00
"github.com/sirupsen/logrus"
2017-06-14 00:15:55 +00:00
"golang.org/x/sync/errgroup"
2017-06-08 00:53:36 +00:00
"google.golang.org/grpc"
)
type Opt struct {
2018-09-11 08:02:46 +00:00
SessionManager * session . Manager
WorkerController * worker . Controller
Frontends map [ string ] frontend . Frontend
CacheKeyStorage solver . CacheKeyStorage
ResolveCacheExporterFuncs map [ string ] remotecache . ResolveCacheExporterFunc
ResolveCacheImporterFuncs map [ string ] remotecache . ResolveCacheImporterFunc
2019-01-10 02:24:25 +00:00
Entitlements [ ] string
2017-06-08 00:53:36 +00:00
}
2017-06-08 22:56:44 +00:00
type Controller struct { // TODO: ControlService
2019-08-06 00:10:30 +00:00
buildCount int64
2018-07-19 16:34:44 +00:00
opt Opt
solver * llbsolver . Solver
cache solver . CacheManager
gatewayForwarder * controlgateway . GatewayForwarder
2018-08-30 21:06:27 +00:00
throttledGC func ( )
gcmu sync . Mutex
2017-06-08 00:53:36 +00:00
}
func NewController ( opt Opt ) ( * Controller , error ) {
2018-07-23 23:27:01 +00:00
cache := solver . NewCacheManager ( "local" , opt . CacheKeyStorage , worker . NewCacheResultStorage ( opt . WorkerController ) )
2018-07-19 16:34:44 +00:00
gatewayForwarder := controlgateway . NewGatewayForwarder ( )
2019-01-10 02:24:25 +00:00
solver , err := llbsolver . New ( opt . WorkerController , opt . Frontends , cache , opt . ResolveCacheImporterFuncs , gatewayForwarder , opt . SessionManager , opt . Entitlements )
2018-06-23 00:31:55 +00:00
if err != nil {
return nil , errors . Wrap ( err , "failed to create solver" )
}
2018-04-13 21:13:48 +00:00
2017-06-08 00:53:36 +00:00
c := & Controller {
2018-07-19 16:34:44 +00:00
opt : opt ,
solver : solver ,
cache : cache ,
gatewayForwarder : gatewayForwarder ,
2017-06-08 00:53:36 +00:00
}
2020-07-18 16:11:39 +00:00
c . throttledGC = throttle . After ( time . Minute , c . gc )
2018-08-30 21:06:27 +00:00
defer func ( ) {
time . AfterFunc ( time . Second , c . throttledGC )
} ( )
2017-06-08 00:53:36 +00:00
return c , nil
}
func ( c * Controller ) Register ( server * grpc . Server ) error {
controlapi . RegisterControlServer ( server , c )
2018-07-19 16:34:44 +00:00
c . gatewayForwarder . Register ( server )
2017-06-08 00:53:36 +00:00
return nil
}
2018-05-01 01:10:54 +00:00
func ( c * Controller ) DiskUsage ( ctx context . Context , r * controlapi . DiskUsageRequest ) ( * controlapi . DiskUsageResponse , error ) {
2017-06-08 00:53:36 +00:00
resp := & controlapi . DiskUsageResponse { }
2017-12-19 09:34:34 +00:00
workers , err := c . opt . WorkerController . List ( )
if err != nil {
return nil , err
}
for _ , w := range workers {
2017-12-15 08:06:54 +00:00
du , err := w . DiskUsage ( ctx , client . DiskUsageInfo {
2017-11-21 08:08:36 +00:00
Filter : r . Filter ,
2017-06-08 00:53:36 +00:00
} )
2017-11-21 08:08:36 +00:00
if err != nil {
return nil , err
}
for _ , r := range du {
resp . Record = append ( resp . Record , & controlapi . UsageRecord {
// TODO: add worker info
ID : r . ID ,
Mutable : r . Mutable ,
InUse : r . InUse ,
Size_ : r . Size ,
Parent : r . Parent ,
UsageCount : int64 ( r . UsageCount ) ,
Description : r . Description ,
CreatedAt : r . CreatedAt ,
LastUsedAt : r . LastUsedAt ,
2018-07-26 19:07:52 +00:00
RecordType : string ( r . RecordType ) ,
2018-07-27 00:53:48 +00:00
Shared : r . Shared ,
2017-11-21 08:08:36 +00:00
} )
}
2017-06-08 00:53:36 +00:00
}
return resp , nil
}
2017-06-08 22:56:44 +00:00
2017-12-27 01:22:50 +00:00
func ( c * Controller ) Prune ( req * controlapi . PruneRequest , stream controlapi . Control_PruneServer ) error {
2019-05-28 23:14:34 +00:00
if atomic . LoadInt64 ( & c . buildCount ) == 0 {
imageutil . CancelCacheLeases ( )
}
2017-12-27 01:22:50 +00:00
ch := make ( chan client . UsageInfo )
eg , ctx := errgroup . WithContext ( stream . Context ( ) )
workers , err := c . opt . WorkerController . List ( )
if err != nil {
return errors . Wrap ( err , "failed to list workers for prune" )
}
2018-07-23 23:27:01 +00:00
didPrune := false
defer func ( ) {
if didPrune {
if c , ok := c . cache . ( interface {
ReleaseUnreferenced ( ) error
} ) ; ok {
if err := c . ReleaseUnreferenced ( ) ; err != nil {
2018-09-14 05:35:38 +00:00
logrus . Errorf ( "failed to release cache metadata: %+v" , err )
2018-07-23 23:27:01 +00:00
}
}
}
} ( )
2017-12-27 01:22:50 +00:00
for _ , w := range workers {
func ( w worker . Worker ) {
eg . Go ( func ( ) error {
2018-07-26 00:20:57 +00:00
return w . Prune ( ctx , ch , client . PruneInfo {
2018-07-31 20:14:53 +00:00
Filter : req . Filter ,
All : req . All ,
KeepDuration : time . Duration ( req . KeepDuration ) ,
KeepBytes : req . KeepBytes ,
2018-07-26 00:20:57 +00:00
} )
2017-12-27 01:22:50 +00:00
} )
} ( w )
}
2018-07-11 17:54:13 +00:00
eg2 , _ := errgroup . WithContext ( stream . Context ( ) )
2017-12-27 01:22:50 +00:00
eg2 . Go ( func ( ) error {
defer close ( ch )
return eg . Wait ( )
} )
eg2 . Go ( func ( ) error {
for r := range ch {
2018-07-23 23:27:01 +00:00
didPrune = true
2017-12-27 01:22:50 +00:00
if err := stream . Send ( & controlapi . UsageRecord {
// TODO: add worker info
ID : r . ID ,
Mutable : r . Mutable ,
InUse : r . InUse ,
Size_ : r . Size ,
Parent : r . Parent ,
UsageCount : int64 ( r . UsageCount ) ,
Description : r . Description ,
CreatedAt : r . CreatedAt ,
LastUsedAt : r . LastUsedAt ,
2018-07-27 00:53:48 +00:00
RecordType : string ( r . RecordType ) ,
Shared : r . Shared ,
2017-12-27 01:22:50 +00:00
} ) ; err != nil {
return err
}
}
return nil
} )
return eg2 . Wait ( )
}
2018-09-11 08:02:46 +00:00
func translateLegacySolveRequest ( req * controlapi . SolveRequest ) error {
// translates ExportRef and ExportAttrs to new Exports (v0.4.0)
if legacyExportRef := req . Cache . ExportRefDeprecated ; legacyExportRef != "" {
ex := & controlapi . CacheOptionsEntry {
Type : "registry" ,
Attrs : req . Cache . ExportAttrsDeprecated ,
}
if ex . Attrs == nil {
ex . Attrs = make ( map [ string ] string )
}
ex . Attrs [ "ref" ] = legacyExportRef
// FIXME(AkihiroSuda): skip append if already exists
req . Cache . Exports = append ( req . Cache . Exports , ex )
req . Cache . ExportRefDeprecated = ""
req . Cache . ExportAttrsDeprecated = nil
}
// translates ImportRefs to new Imports (v0.4.0)
for _ , legacyImportRef := range req . Cache . ImportRefsDeprecated {
im := & controlapi . CacheOptionsEntry {
Type : "registry" ,
Attrs : map [ string ] string { "ref" : legacyImportRef } ,
}
// FIXME(AkihiroSuda): skip append if already exists
req . Cache . Imports = append ( req . Cache . Imports , im )
}
req . Cache . ImportRefsDeprecated = nil
return nil
}
2018-05-01 01:10:54 +00:00
func ( c * Controller ) Solve ( ctx context . Context , req * controlapi . SolveRequest ) ( * controlapi . SolveResponse , error ) {
2019-05-28 23:14:34 +00:00
atomic . AddInt64 ( & c . buildCount , 1 )
defer atomic . AddInt64 ( & c . buildCount , - 1 )
2018-09-11 08:02:46 +00:00
if err := translateLegacySolveRequest ( req ) ; err != nil {
return nil , err
}
2017-08-03 22:24:02 +00:00
2018-08-30 21:06:27 +00:00
defer func ( ) {
time . AfterFunc ( time . Second , c . throttledGC )
} ( )
2017-07-10 20:03:38 +00:00
var expi exporter . ExporterInstance
2017-11-21 08:08:36 +00:00
// TODO: multiworker
// This is actually tricky, as the exporter should come from the worker that has the returned reference. We may need to delay this so that the solver loads this.
w , err := c . opt . WorkerController . GetDefault ( )
if err != nil {
return nil , err
}
2017-07-10 20:03:38 +00:00
if req . Exporter != "" {
2019-02-23 12:56:04 +00:00
exp , err := w . Exporter ( req . Exporter , c . opt . SessionManager )
2017-12-15 08:06:54 +00:00
if err != nil {
return nil , err
2017-07-10 20:03:38 +00:00
}
expi , err = exp . Resolve ( ctx , req . ExporterAttrs )
if err != nil {
return nil , err
}
}
2018-09-11 08:02:46 +00:00
var (
cacheExporter remotecache . Exporter
cacheExportMode solver . CacheExportMode
cacheImports [ ] frontend . CacheOptionsEntry
)
if len ( req . Cache . Exports ) > 1 {
// TODO(AkihiroSuda): this should be fairly easy
return nil , errors . New ( "specifying multiple cache exports is not supported currently" )
2017-10-13 18:54:26 +00:00
}
2018-09-11 08:02:46 +00:00
if len ( req . Cache . Exports ) == 1 {
e := req . Cache . Exports [ 0 ]
cacheExporterFunc , ok := c . opt . ResolveCacheExporterFuncs [ e . Type ]
if ! ok {
return nil , errors . Errorf ( "unknown cache exporter: %q" , e . Type )
}
2020-06-30 01:06:02 +00:00
cacheExporter , err = cacheExporterFunc ( ctx , session . NewGroup ( req . Session ) , e . Attrs )
2017-10-13 18:54:26 +00:00
if err != nil {
return nil , err
}
2018-09-11 08:02:46 +00:00
cacheExportMode = parseCacheExportMode ( e . Attrs [ "mode" ] )
}
for _ , im := range req . Cache . Imports {
cacheImports = append ( cacheImports , frontend . CacheOptionsEntry {
Type : im . Type ,
Attrs : im . Attrs ,
} )
2017-10-13 18:54:26 +00:00
}
2020-06-30 01:06:02 +00:00
resp , err := c . solver . Solve ( ctx , req . Ref , req . Session , frontend . SolveRequest {
2020-02-07 19:53:18 +00:00
Frontend : req . Frontend ,
Definition : req . Definition ,
FrontendOpt : req . FrontendAttrs ,
2020-02-21 18:19:19 +00:00
FrontendInputs : req . FrontendInputs ,
2020-02-07 19:53:18 +00:00
CacheImports : cacheImports ,
2018-04-13 21:13:48 +00:00
} , llbsolver . ExporterRequest {
2018-05-07 21:51:44 +00:00
Exporter : expi ,
CacheExporter : cacheExporter ,
2018-09-11 08:02:46 +00:00
CacheExportMode : cacheExportMode ,
2018-08-04 19:42:01 +00:00
} , req . Entitlements )
2018-05-03 00:35:07 +00:00
if err != nil {
2017-06-08 22:56:44 +00:00
return nil , err
}
2018-05-03 00:35:07 +00:00
return & controlapi . SolveResponse {
ExporterResponse : resp . ExporterResponse ,
} , nil
2017-06-08 22:56:44 +00:00
}
2017-06-13 21:42:51 +00:00
2017-06-14 00:15:55 +00:00
func ( c * Controller ) Status ( req * controlapi . StatusRequest , stream controlapi . Control_StatusServer ) error {
ch := make ( chan * client . SolveStatus , 8 )
eg , ctx := errgroup . WithContext ( stream . Context ( ) )
eg . Go ( func ( ) error {
return c . solver . Status ( ctx , req . Ref , ch )
} )
eg . Go ( func ( ) error {
for {
2017-09-26 03:57:38 +00:00
ss , ok := <- ch
if ! ok {
return nil
}
2020-10-26 21:49:04 +00:00
logSize := 0
retry := false
for {
sr := controlapi . StatusResponse { }
for _ , v := range ss . Vertexes {
sr . Vertexes = append ( sr . Vertexes , & controlapi . Vertex {
Digest : v . Digest ,
Inputs : v . Inputs ,
Name : v . Name ,
Started : v . Started ,
Completed : v . Completed ,
Error : v . Error ,
Cached : v . Cached ,
} )
}
for _ , v := range ss . Statuses {
sr . Statuses = append ( sr . Statuses , & controlapi . VertexStatus {
ID : v . ID ,
Vertex : v . Vertex ,
Name : v . Name ,
Current : v . Current ,
Total : v . Total ,
Timestamp : v . Timestamp ,
Started : v . Started ,
Completed : v . Completed ,
} )
}
for i , v := range ss . Logs {
sr . Logs = append ( sr . Logs , & controlapi . VertexLog {
Vertex : v . Vertex ,
Stream : int64 ( v . Stream ) ,
Msg : v . Data ,
Timestamp : v . Timestamp ,
} )
logSize += len ( v . Data )
// avoid logs growing big and split apart if they do
if logSize > 1024 * 1024 {
ss . Vertexes = nil
ss . Statuses = nil
ss . Logs = ss . Logs [ i + 1 : ]
retry = true
break
}
}
if err := stream . SendMsg ( & sr ) ; err != nil {
return err
}
if ! retry {
break
}
2017-06-14 00:15:55 +00:00
}
}
} )
return eg . Wait ( )
}
2017-07-11 17:12:12 +00:00
func ( c * Controller ) Session ( stream controlapi . Control_SessionServer ) error {
logrus . Debugf ( "session started" )
2018-01-06 16:54:10 +00:00
conn , closeCh , opts := grpchijack . Hijack ( stream )
2017-07-11 17:12:12 +00:00
defer conn . Close ( )
2018-01-06 16:54:10 +00:00
ctx , cancel := context . WithCancel ( stream . Context ( ) )
go func ( ) {
<- closeCh
cancel ( )
} ( )
err := c . opt . SessionManager . HandleConn ( ctx , conn , opts )
2017-07-11 17:12:12 +00:00
logrus . Debugf ( "session finished: %v" , err )
return err
}
2017-12-19 09:34:34 +00:00
2018-05-01 01:10:54 +00:00
func ( c * Controller ) ListWorkers ( ctx context . Context , r * controlapi . ListWorkersRequest ) ( * controlapi . ListWorkersResponse , error ) {
2017-12-19 09:34:34 +00:00
resp := & controlapi . ListWorkersResponse { }
workers , err := c . opt . WorkerController . List ( r . Filter ... )
if err != nil {
return nil , err
}
for _ , w := range workers {
2018-06-30 01:35:39 +00:00
resp . Record = append ( resp . Record , & apitypes . WorkerRecord {
2018-06-22 02:06:12 +00:00
ID : w . ID ( ) ,
Labels : w . Labels ( ) ,
2020-02-25 23:23:34 +00:00
Platforms : pb . PlatformsFromSpec ( w . Platforms ( true ) ) ,
2018-08-30 21:06:27 +00:00
GCPolicy : toPBGCPolicy ( w . GCPolicy ( ) ) ,
2017-12-19 09:34:34 +00:00
} )
}
return resp , nil
}
2018-05-07 21:51:44 +00:00
2018-08-30 21:06:27 +00:00
func ( c * Controller ) gc ( ) {
c . gcmu . Lock ( )
defer c . gcmu . Unlock ( )
workers , err := c . opt . WorkerController . List ( )
if err != nil {
return
}
eg , ctx := errgroup . WithContext ( context . TODO ( ) )
var size int64
ch := make ( chan client . UsageInfo )
done := make ( chan struct { } )
go func ( ) {
for ui := range ch {
size += ui . Size
}
close ( done )
} ( )
for _ , w := range workers {
func ( w worker . Worker ) {
eg . Go ( func ( ) error {
if policy := w . GCPolicy ( ) ; len ( policy ) > 0 {
return w . Prune ( ctx , ch , policy ... )
}
return nil
} )
} ( w )
}
err = eg . Wait ( )
close ( ch )
if err != nil {
logrus . Errorf ( "gc error: %+v" , err )
}
<- done
if size > 0 {
logrus . Debugf ( "gc cleaned up %d bytes" , size )
}
}
2018-09-11 08:02:46 +00:00
func parseCacheExportMode ( mode string ) solver . CacheExportMode {
switch mode {
case "min" :
return solver . CacheExportModeMin
case "max" :
return solver . CacheExportModeMax
case "" :
default :
logrus . Debugf ( "skipping invalid cache export mode: %s" , mode )
2018-05-07 21:51:44 +00:00
}
return solver . CacheExportModeMin
}
2018-08-30 21:06:27 +00:00
func toPBGCPolicy ( in [ ] client . PruneInfo ) [ ] * apitypes . GCPolicy {
policy := make ( [ ] * apitypes . GCPolicy , 0 , len ( in ) )
for _ , p := range in {
policy = append ( policy , & apitypes . GCPolicy {
All : p . All ,
KeepBytes : p . KeepBytes ,
KeepDuration : int64 ( p . KeepDuration ) ,
Filters : p . Filter ,
} )
}
return policy
}