2017-06-08 00:53:36 +00:00
package control
import (
2018-01-16 22:30:10 +00:00
"context"
2017-10-13 18:54:26 +00:00
"github.com/docker/distribution/reference"
2017-06-22 20:15:46 +00:00
controlapi "github.com/moby/buildkit/api/services/control"
2018-06-30 01:35:39 +00:00
apitypes "github.com/moby/buildkit/api/types"
2018-05-07 21:24:28 +00:00
"github.com/moby/buildkit/cache/remotecache"
2017-06-22 20:15:46 +00:00
"github.com/moby/buildkit/client"
2017-07-10 20:03:38 +00:00
"github.com/moby/buildkit/exporter"
2017-08-25 20:08:18 +00:00
"github.com/moby/buildkit/frontend"
2017-07-11 17:12:12 +00:00
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/grpchijack"
2018-05-11 05:58:41 +00:00
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
2018-06-22 02:06:12 +00:00
"github.com/moby/buildkit/solver/pb"
2017-06-22 20:15:46 +00:00
"github.com/moby/buildkit/worker"
2017-06-09 01:16:19 +00:00
"github.com/pkg/errors"
2017-07-19 01:05:19 +00:00
"github.com/sirupsen/logrus"
2017-06-14 00:15:55 +00:00
"golang.org/x/sync/errgroup"
2017-06-08 00:53:36 +00:00
"google.golang.org/grpc"
)
2018-07-03 09:59:33 +00:00
type ResolveCacheExporterFunc func ( ctx context . Context , typ , target string ) ( remotecache . Exporter , error )
2017-06-08 00:53:36 +00:00
type Opt struct {
2018-07-03 09:59:33 +00:00
SessionManager * session . Manager
WorkerController * worker . Controller
Frontends map [ string ] frontend . Frontend
CacheKeyStorage solver . CacheKeyStorage
ResolveCacheExporterFunc remotecache . ResolveCacheExporterFunc
ResolveCacheImporterFunc remotecache . ResolveCacheImporterFunc
2017-06-08 00:53:36 +00:00
}
2017-06-08 22:56:44 +00:00
type Controller struct { // TODO: ControlService
opt Opt
2018-04-13 21:13:48 +00:00
solver * llbsolver . Solver
2018-07-23 23:27:01 +00:00
cache solver . CacheManager
2017-06-08 00:53:36 +00:00
}
func NewController ( opt Opt ) ( * Controller , error ) {
2018-07-23 23:27:01 +00:00
cache := solver . NewCacheManager ( "local" , opt . CacheKeyStorage , worker . NewCacheResultStorage ( opt . WorkerController ) )
solver , err := llbsolver . New ( opt . WorkerController , opt . Frontends , cache , opt . ResolveCacheImporterFunc )
2018-06-23 00:31:55 +00:00
if err != nil {
return nil , errors . Wrap ( err , "failed to create solver" )
}
2018-04-13 21:13:48 +00:00
2017-06-08 00:53:36 +00:00
c := & Controller {
2018-04-13 21:13:48 +00:00
opt : opt ,
solver : solver ,
2018-07-23 23:27:01 +00:00
cache : cache ,
2017-06-08 00:53:36 +00:00
}
return c , nil
}
func ( c * Controller ) Register ( server * grpc . Server ) error {
controlapi . RegisterControlServer ( server , c )
return nil
}
2018-05-01 01:10:54 +00:00
func ( c * Controller ) DiskUsage ( ctx context . Context , r * controlapi . DiskUsageRequest ) ( * controlapi . DiskUsageResponse , error ) {
2017-06-08 00:53:36 +00:00
resp := & controlapi . DiskUsageResponse { }
2017-12-19 09:34:34 +00:00
workers , err := c . opt . WorkerController . List ( )
if err != nil {
return nil , err
}
for _ , w := range workers {
2017-12-15 08:06:54 +00:00
du , err := w . DiskUsage ( ctx , client . DiskUsageInfo {
2017-11-21 08:08:36 +00:00
Filter : r . Filter ,
2017-06-08 00:53:36 +00:00
} )
2017-11-21 08:08:36 +00:00
if err != nil {
return nil , err
}
for _ , r := range du {
resp . Record = append ( resp . Record , & controlapi . UsageRecord {
// TODO: add worker info
ID : r . ID ,
Mutable : r . Mutable ,
InUse : r . InUse ,
Size_ : r . Size ,
Parent : r . Parent ,
UsageCount : int64 ( r . UsageCount ) ,
Description : r . Description ,
CreatedAt : r . CreatedAt ,
LastUsedAt : r . LastUsedAt ,
} )
}
2017-06-08 00:53:36 +00:00
}
return resp , nil
}
2017-06-08 22:56:44 +00:00
2017-12-27 01:22:50 +00:00
func ( c * Controller ) Prune ( req * controlapi . PruneRequest , stream controlapi . Control_PruneServer ) error {
ch := make ( chan client . UsageInfo )
eg , ctx := errgroup . WithContext ( stream . Context ( ) )
workers , err := c . opt . WorkerController . List ( )
if err != nil {
return errors . Wrap ( err , "failed to list workers for prune" )
}
2018-07-23 23:27:01 +00:00
didPrune := false
defer func ( ) {
if didPrune {
if c , ok := c . cache . ( interface {
ReleaseUnreferenced ( ) error
} ) ; ok {
if err := c . ReleaseUnreferenced ( ) ; err != nil {
logrus . Errorf ( "failed to release cache metadata: %+v" )
}
}
}
} ( )
2017-12-27 01:22:50 +00:00
for _ , w := range workers {
func ( w worker . Worker ) {
eg . Go ( func ( ) error {
return w . Prune ( ctx , ch )
} )
} ( w )
}
2018-07-11 17:54:13 +00:00
eg2 , _ := errgroup . WithContext ( stream . Context ( ) )
2017-12-27 01:22:50 +00:00
eg2 . Go ( func ( ) error {
defer close ( ch )
return eg . Wait ( )
} )
eg2 . Go ( func ( ) error {
for r := range ch {
2018-07-23 23:27:01 +00:00
didPrune = true
2017-12-27 01:22:50 +00:00
if err := stream . Send ( & controlapi . UsageRecord {
// TODO: add worker info
ID : r . ID ,
Mutable : r . Mutable ,
InUse : r . InUse ,
Size_ : r . Size ,
Parent : r . Parent ,
UsageCount : int64 ( r . UsageCount ) ,
Description : r . Description ,
CreatedAt : r . CreatedAt ,
LastUsedAt : r . LastUsedAt ,
} ) ; err != nil {
return err
}
}
return nil
} )
return eg2 . Wait ( )
}
2018-05-01 01:10:54 +00:00
func ( c * Controller ) Solve ( ctx context . Context , req * controlapi . SolveRequest ) ( * controlapi . SolveResponse , error ) {
2017-08-03 22:24:02 +00:00
ctx = session . NewContext ( ctx , req . Session )
2017-07-10 20:03:38 +00:00
var expi exporter . ExporterInstance
2017-11-21 08:08:36 +00:00
// TODO: multiworker
// This is actually tricky, as the exporter should come from the worker that has the returned reference. We may need to delay this so that the solver loads this.
w , err := c . opt . WorkerController . GetDefault ( )
if err != nil {
return nil , err
}
2017-07-10 20:03:38 +00:00
if req . Exporter != "" {
2017-12-15 08:06:54 +00:00
exp , err := w . Exporter ( req . Exporter )
if err != nil {
return nil , err
2017-07-10 20:03:38 +00:00
}
expi , err = exp . Resolve ( ctx , req . ExporterAttrs )
if err != nil {
return nil , err
}
}
2018-07-03 09:59:33 +00:00
var cacheExporter remotecache . Exporter
if ref := req . Cache . ExportRef ; ref != "" && c . opt . ResolveCacheExporterFunc != nil {
2017-10-13 18:54:26 +00:00
parsed , err := reference . ParseNormalizedNamed ( ref )
if err != nil {
return nil , err
}
2018-04-13 21:13:48 +00:00
exportCacheRef := reference . TagNameOnly ( parsed ) . String ( )
2018-07-03 09:59:33 +00:00
typ := "" // unimplemented yet (typically registry)
cacheExporter , err = c . opt . ResolveCacheExporterFunc ( ctx , typ , exportCacheRef )
if err != nil {
return nil , err
}
2017-10-13 18:54:26 +00:00
}
2018-04-25 17:49:15 +00:00
var importCacheRefs [ ] string
for _ , ref := range req . Cache . ImportRefs {
2017-10-13 18:54:26 +00:00
parsed , err := reference . ParseNormalizedNamed ( ref )
if err != nil {
return nil , err
}
2018-04-25 17:49:15 +00:00
importCacheRefs = append ( importCacheRefs , reference . TagNameOnly ( parsed ) . String ( ) )
2017-10-13 18:54:26 +00:00
}
2018-04-25 17:49:15 +00:00
resp , err := c . solver . Solve ( ctx , req . Ref , frontend . SolveRequest {
Frontend : req . Frontend ,
Definition : req . Definition ,
FrontendOpt : req . FrontendAttrs ,
ImportCacheRefs : importCacheRefs ,
2018-04-13 21:13:48 +00:00
} , llbsolver . ExporterRequest {
2018-05-07 21:51:44 +00:00
Exporter : expi ,
CacheExporter : cacheExporter ,
CacheExportMode : parseCacheExporterOpt ( req . Cache . ExportAttrs ) ,
2018-05-03 00:35:07 +00:00
} )
if err != nil {
2017-06-08 22:56:44 +00:00
return nil , err
}
2018-05-03 00:35:07 +00:00
return & controlapi . SolveResponse {
ExporterResponse : resp . ExporterResponse ,
} , nil
2017-06-08 22:56:44 +00:00
}
2017-06-13 21:42:51 +00:00
2017-06-14 00:15:55 +00:00
func ( c * Controller ) Status ( req * controlapi . StatusRequest , stream controlapi . Control_StatusServer ) error {
ch := make ( chan * client . SolveStatus , 8 )
eg , ctx := errgroup . WithContext ( stream . Context ( ) )
eg . Go ( func ( ) error {
return c . solver . Status ( ctx , req . Ref , ch )
} )
eg . Go ( func ( ) error {
for {
2017-09-26 03:57:38 +00:00
ss , ok := <- ch
if ! ok {
return nil
}
sr := controlapi . StatusResponse { }
for _ , v := range ss . Vertexes {
sr . Vertexes = append ( sr . Vertexes , & controlapi . Vertex {
Digest : v . Digest ,
Inputs : v . Inputs ,
Name : v . Name ,
Started : v . Started ,
Completed : v . Completed ,
Error : v . Error ,
Cached : v . Cached ,
} )
}
for _ , v := range ss . Statuses {
sr . Statuses = append ( sr . Statuses , & controlapi . VertexStatus {
ID : v . ID ,
Vertex : v . Vertex ,
Name : v . Name ,
Current : v . Current ,
Total : v . Total ,
Timestamp : v . Timestamp ,
Started : v . Started ,
Completed : v . Completed ,
} )
}
for _ , v := range ss . Logs {
sr . Logs = append ( sr . Logs , & controlapi . VertexLog {
Vertex : v . Vertex ,
Stream : int64 ( v . Stream ) ,
Msg : v . Data ,
Timestamp : v . Timestamp ,
} )
}
if err := stream . SendMsg ( & sr ) ; err != nil {
return err
2017-06-14 00:15:55 +00:00
}
}
} )
return eg . Wait ( )
}
2017-07-11 17:12:12 +00:00
func ( c * Controller ) Session ( stream controlapi . Control_SessionServer ) error {
logrus . Debugf ( "session started" )
2018-01-06 16:54:10 +00:00
conn , closeCh , opts := grpchijack . Hijack ( stream )
2017-07-11 17:12:12 +00:00
defer conn . Close ( )
2018-01-06 16:54:10 +00:00
ctx , cancel := context . WithCancel ( stream . Context ( ) )
go func ( ) {
<- closeCh
cancel ( )
} ( )
err := c . opt . SessionManager . HandleConn ( ctx , conn , opts )
2017-07-11 17:12:12 +00:00
logrus . Debugf ( "session finished: %v" , err )
return err
}
2017-12-19 09:34:34 +00:00
2018-05-01 01:10:54 +00:00
func ( c * Controller ) ListWorkers ( ctx context . Context , r * controlapi . ListWorkersRequest ) ( * controlapi . ListWorkersResponse , error ) {
2017-12-19 09:34:34 +00:00
resp := & controlapi . ListWorkersResponse { }
workers , err := c . opt . WorkerController . List ( r . Filter ... )
if err != nil {
return nil , err
}
for _ , w := range workers {
2018-06-30 01:35:39 +00:00
resp . Record = append ( resp . Record , & apitypes . WorkerRecord {
2018-06-22 02:06:12 +00:00
ID : w . ID ( ) ,
Labels : w . Labels ( ) ,
2018-06-30 01:35:39 +00:00
Platforms : pb . PlatformsFromSpec ( w . Platforms ( ) ) ,
2017-12-19 09:34:34 +00:00
} )
}
return resp , nil
}
2018-05-07 21:51:44 +00:00
func parseCacheExporterOpt ( opt map [ string ] string ) solver . CacheExportMode {
for k , v := range opt {
switch k {
case "mode" :
switch v {
case "min" :
return solver . CacheExportModeMin
case "max" :
return solver . CacheExportModeMax
default :
logrus . Debugf ( "skipping incalid cache export mode: %s" , v )
}
default :
logrus . Warnf ( "skipping invalid cache export opt: %s" , v )
}
}
return solver . CacheExportModeMin
}