From 774db61655894d6b5e4f76e98ff09997ceece263 Mon Sep 17 00:00:00 2001 From: Mzack9999 Date: Wed, 3 Apr 2024 18:50:46 +0200 Subject: [PATCH] lightweight adaptivity on workpool --- internal/runner/inputs.go | 14 ++++++-------- pkg/core/engine.go | 13 +++++++++---- pkg/core/execute_options.go | 4 +++- pkg/core/executors.go | 6 ++++++ pkg/core/workpool.go | 25 +++++++++++++++++++++++++ pkg/js/compiler/pool.go | 5 +++++ 6 files changed, 54 insertions(+), 13 deletions(-) diff --git a/internal/runner/inputs.go b/internal/runner/inputs.go index 75a86991..60aa0319 100644 --- a/internal/runner/inputs.go +++ b/internal/runner/inputs.go @@ -15,12 +15,11 @@ import ( syncutil "github.com/projectdiscovery/utils/sync" ) -const probeBulkSize = 50 +var GlobalProbeBulkSize = 50 // initializeTemplatesHTTPInput initializes the http form of input // for any loaded http templates if input is in non-standard format. func (r *Runner) initializeTemplatesHTTPInput() (*hybrid.HybridMap, error) { - hm, err := hybrid.New(hybrid.DefaultDiskOptions) if err != nil { return nil, errors.Wrap(err, "could not create temporary input file") @@ -31,11 +30,6 @@ func (r *Runner) initializeTemplatesHTTPInput() (*hybrid.HybridMap, error) { } gologger.Info().Msgf("Running httpx on input host") - var bulkSize = probeBulkSize - if r.options.BulkSize > probeBulkSize { - bulkSize = r.options.BulkSize - } - httpxOptions := httpx.DefaultOptions httpxOptions.RetryMax = r.options.Retries httpxOptions.Timeout = time.Duration(r.options.Timeout) * time.Second @@ -45,7 +39,7 @@ func (r *Runner) initializeTemplatesHTTPInput() (*hybrid.HybridMap, error) { } // Probe the non-standard URLs and store them in cache - swg, err := syncutil.New(syncutil.WithSize(bulkSize)) + swg, err := syncutil.New(syncutil.WithSize(GlobalProbeBulkSize)) if err != nil { return nil, errors.Wrap(err, "could not create adaptive group") } @@ -55,6 +49,10 @@ func (r *Runner) initializeTemplatesHTTPInput() (*hybrid.HybridMap, error) { return true } + if swg.Size != GlobalProbeBulkSize { + swg.Resize(GlobalProbeBulkSize) + } + swg.Add() go func(input *contextargs.MetaInput) { defer swg.Done() diff --git a/pkg/core/engine.go b/pkg/core/engine.go index 93915bc2..4dfb8e0b 100644 --- a/pkg/core/engine.go +++ b/pkg/core/engine.go @@ -30,14 +30,19 @@ func New(options *types.Options) *Engine { return engine } -// GetWorkPool returns a workpool from options -func (e *Engine) GetWorkPool() *WorkPool { - return NewWorkPool(WorkPoolConfig{ +func (e *Engine) GetWorkPoolConfig() WorkPoolConfig { + config := WorkPoolConfig{ InputConcurrency: e.options.BulkSize, TypeConcurrency: e.options.TemplateThreads, HeadlessInputConcurrency: e.options.HeadlessBulkSize, HeadlessTypeConcurrency: e.options.HeadlessTemplateThreads, - }) + } + return config +} + +// GetWorkPool returns a workpool from options +func (e *Engine) GetWorkPool() *WorkPool { + return NewWorkPool(e.GetWorkPoolConfig()) } // SetExecuterOptions sets the executer options for the engine. This is required diff --git a/pkg/core/execute_options.go b/pkg/core/execute_options.go index 580b8b0a..93f197fc 100644 --- a/pkg/core/execute_options.go +++ b/pkg/core/execute_options.go @@ -108,8 +108,10 @@ func (e *Engine) executeTemplateSpray(templatesList []*templates.Template, targe wp := e.GetWorkPool() for _, template := range templatesList { - templateType := template.Type() + // resize check point - nop if there are no changes + wp.RefreshWithConfig(e.GetWorkPoolConfig()) + templateType := template.Type() var wg *syncutil.AdaptiveWaitGroup if templateType == types.HeadlessProtocol { wg = wp.Headless diff --git a/pkg/core/executors.go b/pkg/core/executors.go index ace7acb2..a6421cd9 100644 --- a/pkg/core/executors.go +++ b/pkg/core/executors.go @@ -158,6 +158,9 @@ func (e *Engine) executeTemplatesOnTarget(alltemplates []*templates.Template, ta wp := e.GetWorkPool() for _, tpl := range alltemplates { + // resize check point - nop if there are no changes + wp.RefreshWithConfig(e.GetWorkPoolConfig()) + var sg *syncutil.AdaptiveWaitGroup if tpl.Type() == types.HeadlessProtocol { sg = wp.Headless @@ -213,6 +216,9 @@ func (e *ChildExecuter) Close() *atomic.Bool { func (e *ChildExecuter) Execute(template *templates.Template, value *contextargs.MetaInput) { templateType := template.Type() + // resize check point - nop if there are no changes + e.e.workPool.RefreshWithConfig(e.e.GetWorkPoolConfig()) + var wg *syncutil.AdaptiveWaitGroup if templateType == types.HeadlessProtocol { wg = e.e.workPool.Headless diff --git a/pkg/core/workpool.go b/pkg/core/workpool.go index cd17a0b3..810f9939 100644 --- a/pkg/core/workpool.go +++ b/pkg/core/workpool.go @@ -57,3 +57,28 @@ func (w *WorkPool) InputPool(templateType types.ProtocolType) *syncutil.Adaptive swg, _ := syncutil.New(syncutil.WithSize(count)) return swg } + +func (w *WorkPool) RefreshWithConfig(config WorkPoolConfig) { + if w.config.TypeConcurrency != config.TypeConcurrency { + w.config.TypeConcurrency = config.TypeConcurrency + } + if w.config.HeadlessTypeConcurrency != config.HeadlessTypeConcurrency { + w.config.HeadlessTypeConcurrency = config.HeadlessTypeConcurrency + } + if w.config.InputConcurrency != config.InputConcurrency { + w.config.InputConcurrency = config.InputConcurrency + } + if w.config.HeadlessInputConcurrency != config.HeadlessInputConcurrency { + w.config.HeadlessInputConcurrency = config.HeadlessInputConcurrency + } + w.Refresh() +} + +func (w *WorkPool) Refresh() { + if w.Default.Size != w.config.TypeConcurrency { + w.Default.Resize(w.config.TypeConcurrency) + } + if w.Headless.Size != w.config.HeadlessTypeConcurrency { + w.Headless.Resize(w.config.HeadlessTypeConcurrency) + } +} diff --git a/pkg/js/compiler/pool.go b/pkg/js/compiler/pool.go index fc0b6163..2b3bc217 100644 --- a/pkg/js/compiler/pool.go +++ b/pkg/js/compiler/pool.go @@ -100,6 +100,11 @@ func executeWithRuntime(runtime *goja.Runtime, p *goja.Program, args *ExecuteArg // ExecuteProgram executes a compiled program with the default options. // it deligates if a particular program should run in a pooled or non-pooled runtime func ExecuteProgram(p *goja.Program, args *ExecuteArgs, opts *ExecuteOptions) (result goja.Value, err error) { + // resize check point + if pooljsc.Size != PoolingJsVmConcurrency { + pooljsc.Resize(PoolingJsVmConcurrency) + } + if opts.Source == nil { // not-recommended anymore return executeWithoutPooling(p, args, opts)