pdcp result chunked upload (#4662)

* chunked pdcp cloud results upload

* add -sid option to specify scanid

* fix scan result append endpoint
dev
Tarun Koyalwar 2024-01-21 02:26:16 +05:30 committed by GitHub
parent f7ba2390bf
commit 29b69a12ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 197 additions and 85 deletions

View File

@ -290,8 +290,9 @@ STATISTICS:
-mp, -metrics-port int port to expose nuclei metrics on (default 9092) -mp, -metrics-port int port to expose nuclei metrics on (default 9092)
CLOUD: CLOUD:
-auth configure projectdiscovery cloud (pdcp) api key -auth configure projectdiscovery cloud (pdcp) api key
-cup, -cloud-upload upload scan results to pdcp dashboard -cup, -cloud-upload upload scan results to pdcp dashboard
-sid, -scan-id string upload scan results to given scan id
EXAMPLES: EXAMPLES:

View File

@ -293,9 +293,10 @@ UNCOVER引擎:
-si, -stats-inerval int 显示统计信息更新的间隔秒数默认5 -si, -stats-inerval int 显示统计信息更新的间隔秒数默认5
-mp, -metrics-port int 更改metrics服务的端口默认9092 -mp, -metrics-port int 更改metrics服务的端口默认9092
云服务: 云服务:
-auth 配置projectdiscovery云pdcpAPI密钥 -auth 配置projectdiscovery云服务pdcpAPI密钥
-cup, -cloud-upload 将扫描结果上传到pdcp仪表板 -cup, -cloud-upload 将扫描结果上传到pdcp仪表板
-sid, -scan-id string 将扫描结果上传到指定的扫描ID
例子: 例子:
扫描一个单独的URL: 扫描一个单独的URL:

View File

@ -266,8 +266,9 @@ STATISTICS:
-mp, -metrics-port int port to expose nuclei metrics on (default 9092) -mp, -metrics-port int port to expose nuclei metrics on (default 9092)
CLOUD: CLOUD:
-auth configure projectdiscovery cloud (pdcp) api key -auth configure projectdiscovery cloud (pdcp) api key
-cup, -cloud-upload upload scan results to pdcp dashboard -cup, -cloud-upload upload scan results to pdcp dashboard
-sid, -scan-id string upload scan results to given scan id
EXAMPLES: EXAMPLES:

View File

@ -264,8 +264,9 @@ STATISTICS:
-mp, -metrics-port int nuclei 메트릭스를 노출할 포트 (기본값 9092) -mp, -metrics-port int nuclei 메트릭스를 노출할 포트 (기본값 9092)
CLOUD: CLOUD:
-auth projectdiscovery cloud (pdcp) api 키 설정 -auth projectdiscovery 클라우드 (pdcp) API 키 구성
-cup, -cloud-upload 스캔 결과를 pdcp 대시보드에 업로드 -cup, -cloud-upload 스캔 결과를 pdcp 대시보드에 업로드
-sid, -scan-id string 주어진 스캔 ID에 스캔 결과 업로드
예시: 예시:

View File

@ -127,6 +127,9 @@ func main() {
defer cancel() defer cancel()
stackMonitor.RegisterCallback(func(dumpID string) error { stackMonitor.RegisterCallback(func(dumpID string) error {
resumeFileName := fmt.Sprintf("crash-resume-file-%s.dump", dumpID) resumeFileName := fmt.Sprintf("crash-resume-file-%s.dump", dumpID)
if options.EnableCloudUpload {
gologger.Info().Msgf("Uploading scan results to cloud...")
}
nucleiRunner.Close() nucleiRunner.Close()
gologger.Info().Msgf("Creating resume file: %s\n", resumeFileName) gologger.Info().Msgf("Creating resume file: %s\n", resumeFileName)
err := nucleiRunner.SaveResumeConfig(resumeFileName) err := nucleiRunner.SaveResumeConfig(resumeFileName)
@ -143,6 +146,9 @@ func main() {
for range c { for range c {
gologger.Info().Msgf("CTRL+C pressed: Exiting\n") gologger.Info().Msgf("CTRL+C pressed: Exiting\n")
gologger.Info().Msgf("Attempting graceful shutdown...") gologger.Info().Msgf("Attempting graceful shutdown...")
if options.EnableCloudUpload {
gologger.Info().Msgf("Uploading scan results to cloud...")
}
nucleiRunner.Close() nucleiRunner.Close()
if options.ShouldSaveResume() { if options.ShouldSaveResume() {
gologger.Info().Msgf("Creating resume file: %s\n", resumeFileName) gologger.Info().Msgf("Creating resume file: %s\n", resumeFileName)
@ -380,6 +386,7 @@ on extensive configurability, massive extensibility and ease of use.`)
flagSet.CreateGroup("cloud", "Cloud", flagSet.CreateGroup("cloud", "Cloud",
flagSet.BoolVar(&pdcpauth, "auth", false, "configure projectdiscovery cloud (pdcp) api key"), flagSet.BoolVar(&pdcpauth, "auth", false, "configure projectdiscovery cloud (pdcp) api key"),
flagSet.BoolVarP(&options.EnableCloudUpload, "cloud-upload", "cup", false, "upload scan results to pdcp dashboard"), flagSet.BoolVarP(&options.EnableCloudUpload, "cloud-upload", "cup", false, "upload scan results to pdcp dashboard"),
flagSet.StringVarP(&options.ScanID, "scan-id", "sid", "", "upload scan results to given scan id"),
) )
flagSet.SetCustomHelpText(`EXAMPLES: flagSet.SetCustomHelpText(`EXAMPLES:

18
internal/pdcp/utils.go Normal file
View File

@ -0,0 +1,18 @@
package pdcp
import (
pdcpauth "github.com/projectdiscovery/utils/auth/pdcp"
urlutil "github.com/projectdiscovery/utils/url"
)
func getScanDashBoardURL(id string) string {
ux, _ := urlutil.Parse(pdcpauth.DashBoardURL)
ux.Path = "/scans/" + id
ux.Update()
return ux.String()
}
type uploadResponse struct {
ID string `json:"id"`
Message string `json:"message"`
}

View File

@ -1,30 +1,30 @@
package pdcp package pdcp
import ( import (
"bufio"
"bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"os"
"path/filepath"
"strconv"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/projectdiscovery/gologger" "github.com/projectdiscovery/gologger"
"github.com/projectdiscovery/nuclei/v3/pkg/catalog/config"
"github.com/projectdiscovery/nuclei/v3/pkg/output" "github.com/projectdiscovery/nuclei/v3/pkg/output"
"github.com/projectdiscovery/retryablehttp-go" "github.com/projectdiscovery/retryablehttp-go"
pdcpauth "github.com/projectdiscovery/utils/auth/pdcp" pdcpauth "github.com/projectdiscovery/utils/auth/pdcp"
errorutil "github.com/projectdiscovery/utils/errors" errorutil "github.com/projectdiscovery/utils/errors"
fileutil "github.com/projectdiscovery/utils/file"
folderutil "github.com/projectdiscovery/utils/folder"
urlutil "github.com/projectdiscovery/utils/url" urlutil "github.com/projectdiscovery/utils/url"
) )
const ( const (
uploadEndpoint = "/v1/scans/import" uploadEndpoint = "/v1/scans/import"
appendEndpoint = "/v1/scans/%s/import"
flushTimer = time.Duration(1) * time.Minute
MaxChunkSize = 1024 * 1024 * 4 // 4 MB
) )
var _ output.Writer = &UploadWriter{} var _ output.Writer = &UploadWriter{}
@ -34,31 +34,28 @@ var _ output.Writer = &UploadWriter{}
type UploadWriter struct { type UploadWriter struct {
*output.StandardWriter *output.StandardWriter
creds *pdcpauth.PDCPCredentials creds *pdcpauth.PDCPCredentials
tempFile *os.File
done atomic.Bool
uploadURL *url.URL uploadURL *url.URL
client *retryablehttp.Client
cancel context.CancelFunc
done chan struct{}
scanID string
counter atomic.Int32
} }
// NewUploadWriter creates a new upload writer // NewUploadWriter creates a new upload writer
func NewUploadWriter(creds *pdcpauth.PDCPCredentials) (*UploadWriter, error) { func NewUploadWriter(ctx context.Context, creds *pdcpauth.PDCPCredentials) (*UploadWriter, error) {
if creds == nil { if creds == nil {
return nil, fmt.Errorf("no credentials provided") return nil, fmt.Errorf("no credentials provided")
} }
u := &UploadWriter{creds: creds} u := &UploadWriter{
// create a temporary file in cache directory creds: creds,
cacheDir := folderutil.AppCacheDirOrDefault("", config.BinaryName) done: make(chan struct{}, 1),
if !fileutil.FolderExists(cacheDir) {
_ = fileutil.CreateFolder(cacheDir)
} }
var err error var err error
// tempfile is created in nuclei-results-<unix-timestamp>.json format reader, writer := io.Pipe()
u.tempFile, err = os.OpenFile(filepath.Join(cacheDir, "nuclei-results-"+strconv.Itoa(int(time.Now().Unix()))+".json"), os.O_RDWR|os.O_CREATE, 0600) // create standard writer
if err != nil {
return nil, errorutil.NewWithErr(err).Msgf("could not create temporary file")
}
u.StandardWriter, err = output.NewWriter( u.StandardWriter, err = output.NewWriter(
output.WithWriter(u.tempFile), output.WithWriter(writer),
output.WithJson(true, true), output.WithJson(true, true),
) )
if err != nil { if err != nil {
@ -71,87 +68,164 @@ func NewUploadWriter(creds *pdcpauth.PDCPCredentials) (*UploadWriter, error) {
tmp.Path = uploadEndpoint tmp.Path = uploadEndpoint
tmp.Update() tmp.Update()
u.uploadURL = tmp.URL u.uploadURL = tmp.URL
// create http client
opts := retryablehttp.DefaultOptionsSingle
opts.NoAdjustTimeout = true
opts.Timeout = time.Duration(3) * time.Minute
u.client = retryablehttp.NewClient(opts)
// create context
ctx, u.cancel = context.WithCancel(ctx)
// start auto commit
// upload every 1 minute or when buffer is full
go u.autoCommit(ctx, reader)
return u, nil return u, nil
} }
type uploadResponse struct { // SetScanID sets the scan id for the upload writer
ID string `json:"id"` func (u *UploadWriter) SetScanID(id string) {
Message string `json:"message"` u.scanID = id
} }
// Upload uploads the results to pdcp server func (u *UploadWriter) autoCommit(ctx context.Context, r *io.PipeReader) {
func (u *UploadWriter) Upload() { reader := bufio.NewReader(r)
defer u.done.Store(true) ch := make(chan string, 4)
_ = u.tempFile.Sync() // continuously read from the reader and send to channel
info, err := u.tempFile.Stat() go func() {
if err != nil { defer r.Close()
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err) defer close(ch)
return for {
} data, err := reader.ReadString('\n')
if info.Size() == 0 { if err != nil {
gologger.Verbose().Msgf("Scan results upload to cloud skipped, no results found to upload") return
return }
} u.counter.Add(1)
_, _ = u.tempFile.Seek(0, 0) ch <- data
}
}()
id, err := u.upload() // wait for context to be done
if err != nil { defer func() {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err) u.done <- struct{}{}
return close(u.done)
// if no scanid is generated no results were uploaded
if u.scanID == "" {
gologger.Verbose().Msgf("Scan results upload to cloud skipped, no results found to upload")
} else {
gologger.Info().Msgf("%v Scan results uploaded to cloud, you can view scan results at %v", u.counter.Load(), getScanDashBoardURL(u.scanID))
}
}()
// temporary buffer to store the results
buff := &bytes.Buffer{}
ticker := time.NewTicker(flushTimer)
for {
select {
case <-ctx.Done():
// flush before exit
if buff.Len() > 0 {
if err := u.uploadChunk(buff); err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
}
}
return
case <-ticker.C:
// flush the buffer
if buff.Len() > 0 {
if err := u.uploadChunk(buff); err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
}
}
case line, ok := <-ch:
if !ok {
if buff.Len() > 0 {
if err := u.uploadChunk(buff); err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
}
}
return
}
if buff.Len()+len(line) > MaxChunkSize {
// flush existing buffer
if err := u.uploadChunk(buff); err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
}
} else {
buff.WriteString(line)
}
}
} }
gologger.Info().Msgf("Scan results uploaded! View them at %v", getScanDashBoardURL(id))
} }
func (u *UploadWriter) upload() (string, error) { // uploadChunk uploads a chunk of data to the server
req, err := retryablehttp.NewRequest(http.MethodPost, u.uploadURL.String(), u.tempFile) func (u *UploadWriter) uploadChunk(buff *bytes.Buffer) error {
if err != nil { if err := u.upload(buff.Bytes()); err != nil {
return "", errorutil.NewWithErr(err).Msgf("could not create cloud upload request") return errorutil.NewWithErr(err).Msgf("could not upload chunk")
} }
req.Header.Set(pdcpauth.ApiKeyHeaderName, u.creds.APIKey) // if successful, reset the buffer
req.Header.Set("Content-Type", "application/octet-stream") buff.Reset()
req.Header.Set("Accept", "application/json") // log in verbose mode
gologger.Warning().Msgf("Uploaded results chunk, you can view scan results at %v", getScanDashBoardURL(u.scanID))
return nil
}
opts := retryablehttp.DefaultOptionsSingle func (u *UploadWriter) upload(data []byte) error {
// we are uploading nuclei results which can be large req, err := u.getRequest(data)
// server has a size limit of ~20ish MB
opts.Timeout = time.Duration(3) * time.Minute
client := retryablehttp.NewClient(opts)
resp, err := client.Do(req)
if err != nil { if err != nil {
return "", errorutil.NewWithErr(err).Msgf("could not upload results") return errorutil.NewWithErr(err).Msgf("could not create upload request")
}
resp, err := u.client.Do(req)
if err != nil {
return errorutil.NewWithErr(err).Msgf("could not upload results")
} }
defer resp.Body.Close() defer resp.Body.Close()
bin, err := io.ReadAll(resp.Body) bin, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return "", errorutil.NewWithErr(err).Msgf("could not get id from response") return errorutil.NewWithErr(err).Msgf("could not get id from response")
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("could not upload results got status code %v", resp.StatusCode) return fmt.Errorf("could not upload results got status code %v on %v", resp.StatusCode, resp.Request.URL.String())
} }
var uploadResp uploadResponse var uploadResp uploadResponse
if err := json.Unmarshal(bin, &uploadResp); err != nil { if err := json.Unmarshal(bin, &uploadResp); err != nil {
return "", errorutil.NewWithErr(err).Msgf("could not unmarshal response got %v", string(bin)) return errorutil.NewWithErr(err).Msgf("could not unmarshal response got %v", string(bin))
} }
u.removeTempFile() if uploadResp.ID != "" && u.scanID == "" {
return uploadResp.ID, nil u.scanID = uploadResp.ID
}
return nil
} }
// removeTempFile removes the temporary file // getRequest returns a new request for upload
func (u *UploadWriter) removeTempFile() { // if scanID is not provided create new scan by uploading the data
_ = os.Remove(u.tempFile.Name()) // if scanID is provided append the data to existing scan
func (u *UploadWriter) getRequest(bin []byte) (*retryablehttp.Request, error) {
var method, url string
if u.scanID == "" {
u.uploadURL.Path = uploadEndpoint
method = http.MethodPost
url = u.uploadURL.String()
} else {
u.uploadURL.Path = fmt.Sprintf(appendEndpoint, u.scanID)
method = http.MethodPatch
url = u.uploadURL.String()
}
req, err := retryablehttp.NewRequest(method, url, bytes.NewReader(bin))
if err != nil {
return nil, errorutil.NewWithErr(err).Msgf("could not create cloud upload request")
}
req.Header.Set(pdcpauth.ApiKeyHeaderName, u.creds.APIKey)
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Accept", "application/json")
return req, nil
} }
// Close closes the upload writer // Close closes the upload writer
func (u *UploadWriter) Close() { func (u *UploadWriter) Close() {
if !u.done.Load() { u.cancel()
u.Upload() <-u.done
} u.StandardWriter.Close()
}
func getScanDashBoardURL(id string) string {
ux, _ := urlutil.Parse(pdcpauth.DashBoardURL)
ux.Path = "/scans/" + id
ux.Update()
return ux.String()
} }

View File

@ -354,6 +354,10 @@ func (r *Runner) Close() {
// setupPDCPUpload sets up the PDCP upload writer // setupPDCPUpload sets up the PDCP upload writer
// by creating a new writer and returning it // by creating a new writer and returning it
func (r *Runner) setupPDCPUpload(writer output.Writer) output.Writer { func (r *Runner) setupPDCPUpload(writer output.Writer) output.Writer {
// if scanid is given implicitly consider that scan upload is enabled
if r.options.ScanID != "" {
r.options.EnableCloudUpload = true
}
if !(r.options.EnableCloudUpload || EnableCloudUpload) { if !(r.options.EnableCloudUpload || EnableCloudUpload) {
r.pdcpUploadErrMsg = fmt.Sprintf("[%v] Scan results upload to cloud is disabled.", aurora.BrightYellow("WRN")) r.pdcpUploadErrMsg = fmt.Sprintf("[%v] Scan results upload to cloud is disabled.", aurora.BrightYellow("WRN"))
return writer return writer
@ -368,11 +372,14 @@ func (r *Runner) setupPDCPUpload(writer output.Writer) output.Writer {
r.pdcpUploadErrMsg = fmt.Sprintf("[%v] To view results on Cloud Dashboard, Configure API key from %v", color.BrightYellow("WRN"), pdcpauth.DashBoardURL) r.pdcpUploadErrMsg = fmt.Sprintf("[%v] To view results on Cloud Dashboard, Configure API key from %v", color.BrightYellow("WRN"), pdcpauth.DashBoardURL)
return writer return writer
} }
uploadWriter, err := pdcp.NewUploadWriter(creds) uploadWriter, err := pdcp.NewUploadWriter(context.Background(), creds)
if err != nil { if err != nil {
r.pdcpUploadErrMsg = fmt.Sprintf("[%v] PDCP (%v) Auto-Save Failed: %s\n", color.BrightYellow("WRN"), pdcpauth.DashBoardURL, err) r.pdcpUploadErrMsg = fmt.Sprintf("[%v] PDCP (%v) Auto-Save Failed: %s\n", color.BrightYellow("WRN"), pdcpauth.DashBoardURL, err)
return writer return writer
} }
if r.options.ScanID != "" {
uploadWriter.SetScanID(r.options.ScanID)
}
return output.NewMultiWriter(writer, uploadWriter) return output.NewMultiWriter(writer, uploadWriter)
} }

View File

@ -364,6 +364,8 @@ type Options struct {
EnableCodeTemplates bool EnableCodeTemplates bool
// Disables cloud upload // Disables cloud upload
EnableCloudUpload bool EnableCloudUpload bool
// ScanID is the scan ID to use for cloud upload
ScanID string
} }
// ShouldLoadResume resume file // ShouldLoadResume resume file