From 4bfb849310a2d2befbabddc62fd79aca0d028953 Mon Sep 17 00:00:00 2001 From: mikcl <43545032+Mikcl@users.noreply.github.com> Date: Wed, 11 Jan 2023 20:18:02 +0000 Subject: [PATCH] Parallel package catalog processing (#1355) * catalog: run cataloggers concurrently Signed-off-by: mikcl * frontend: expose workers as a configurable option Signed-off-by: mikcl * fixup! frontend: expose workers as a configurable option Signed-off-by: mikcl * update logging statements Signed-off-by: Alex Goodman * test: assert for debug logging Signed-off-by: mikcl Signed-off-by: mikcl Signed-off-by: Alex Goodman Co-authored-by: Alex Goodman --- README.md | 3 + internal/config/application.go | 5 +- syft/lib.go | 2 +- syft/pkg/cataloger/catalog.go | 147 +++++++++++++++++----- syft/pkg/cataloger/config.go | 8 +- test/cli/packages_cmd_test.go | 25 ++++ test/integration/catalog_packages_test.go | 2 +- 7 files changed, 152 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 1ff6ea649..14ccbf40f 100644 --- a/README.md +++ b/README.md @@ -522,6 +522,9 @@ file-metadata: # SYFT_FILE_METADATA_DIGESTS env var digests: ["sha256"] +# maximum number of workers used to process the list of package catalogers in parallel +parallelism: 1 + # cataloging secrets is exposed through the power-user subcommand secrets: cataloger: diff --git a/internal/config/application.go b/internal/config/application.go index 2c38f1e98..b4589152e 100644 --- a/internal/config/application.go +++ b/internal/config/application.go @@ -56,6 +56,7 @@ type Application struct { Attest attest `yaml:"attest" json:"attest" mapstructure:"attest"` Platform string `yaml:"platform" json:"platform" mapstructure:"platform"` Name string `yaml:"name" json:"name" mapstructure:"name"` + Parallelism int `yaml:"parallelism" json:"parallelism" mapstructure:"parallelism"` // the number of catalog workers to run in parallel } func (cfg Application) ToCatalogerConfig() cataloger.Config { @@ -65,7 +66,8 @@ func (cfg Application) ToCatalogerConfig() cataloger.Config { IncludeUnindexedArchives: cfg.Package.SearchUnindexedArchives, Scope: cfg.Package.Cataloger.ScopeOpt, }, - Catalogers: cfg.Catalogers, + Catalogers: cfg.Catalogers, + Parallelism: cfg.Parallelism, } } @@ -182,6 +184,7 @@ func loadDefaultValues(v *viper.Viper) { v.SetDefault("quiet", false) v.SetDefault("check-for-app-update", true) v.SetDefault("catalogers", nil) + v.SetDefault("parallelism", 1) // for each field in the configuration struct, see if the field implements the defaultValueLoader interface and invoke it if it does value := reflect.ValueOf(Application{}) diff --git a/syft/lib.go b/syft/lib.go index 44261c384..bfed4bb5b 100644 --- a/syft/lib.go +++ b/syft/lib.go @@ -69,7 +69,7 @@ func CatalogPackages(src *source.Source, cfg cataloger.Config) (*pkg.Catalog, [] } } - catalog, relationships, err := cataloger.Catalog(resolver, release, catalogers...) + catalog, relationships, err := cataloger.Catalog(resolver, release, cfg.Parallelism, catalogers...) if err != nil { return nil, nil, nil, err } diff --git a/syft/pkg/cataloger/catalog.go b/syft/pkg/cataloger/catalog.go index ce0c7158e..859e66e2b 100644 --- a/syft/pkg/cataloger/catalog.go +++ b/syft/pkg/cataloger/catalog.go @@ -2,6 +2,8 @@ package cataloger import ( "fmt" + "math" + "sync" "github.com/hashicorp/go-multierror" "github.com/wagoodman/go-partybus" @@ -23,6 +25,15 @@ type Monitor struct { PackagesDiscovered progress.Monitorable // the number of packages discovered from all registered catalogers } +// catalogResult provides the result of running a single cataloger against source +type catalogResult struct { + Packages []pkg.Package + Relationships []artifact.Relationship + // Discovered may sometimes be more than len(packages) + Discovered int64 + Error error +} + // newMonitor creates a new Monitor object and publishes the object on the bus as a PackageCatalogerStarted event. func newMonitor() (*progress.Manual, *progress.Manual) { filesProcessed := progress.Manual{} @@ -38,53 +49,121 @@ func newMonitor() (*progress.Manual, *progress.Manual) { return &filesProcessed, &packagesDiscovered } +func runCataloger(cataloger pkg.Cataloger, resolver source.FileResolver) (*catalogResult, error) { + catalogerResult := new(catalogResult) + + // find packages from the underlying raw data + log.WithFields("cataloger", cataloger.Name()).Trace("cataloging started") + packages, relationships, err := cataloger.Catalog(resolver) + if err != nil { + log.WithFields("cataloger", cataloger.Name()).Warn("error while cataloging") + return catalogerResult, err + } + + catalogedPackages := len(packages) + + log.WithFields("cataloger", cataloger.Name()).Debugf("discovered %d packages", catalogedPackages) + catalogerResult.Discovered = int64(catalogedPackages) + + for _, p := range packages { + // generate CPEs (note: this is excluded from package ID, so is safe to mutate) + // we might have binary classified CPE already with the package so we want to append here + p.CPEs = append(p.CPEs, cpe.Generate(p)...) + + // if we were not able to identify the language we have an opportunity + // to try and get this value from the PURL. Worst case we assert that + // we could not identify the language at either stage and set UnknownLanguage + if p.Language == "" { + p.Language = pkg.LanguageFromPURL(p.PURL) + } + + // create file-to-package relationships for files owned by the package + owningRelationships, err := packageFileOwnershipRelationships(p, resolver) + if err != nil { + log.WithFields("cataloger", cataloger.Name(), "package", p.Name, "error", err).Warnf("unable to create any package-file relationships") + } else { + catalogerResult.Relationships = append(catalogerResult.Relationships, owningRelationships...) + } + catalogerResult.Packages = append(catalogerResult.Packages, p) + } + catalogerResult.Relationships = append(catalogerResult.Relationships, relationships...) + log.WithFields("cataloger", cataloger.Name()).Trace("cataloging complete") + return catalogerResult, nil +} + // Catalog a given source (container image or filesystem) with the given catalogers, returning all discovered packages. // In order to efficiently retrieve contents from a underlying container image the content fetch requests are // done in bulk. Specifically, all files of interest are collected from each catalogers and accumulated into a single // request. -func Catalog(resolver source.FileResolver, release *linux.Release, catalogers ...pkg.Cataloger) (*pkg.Catalog, []artifact.Relationship, error) { +// +//nolint:funlen +func Catalog(resolver source.FileResolver, release *linux.Release, parallelism int, catalogers ...pkg.Cataloger) (*pkg.Catalog, []artifact.Relationship, error) { catalog := pkg.NewCatalog() var allRelationships []artifact.Relationship filesProcessed, packagesDiscovered := newMonitor() // perform analysis, accumulating errors for each failed analysis var errs error - for _, c := range catalogers { - // find packages from the underlying raw data - log.Debugf("cataloging with %q", c.Name()) - packages, relationships, err := c.Catalog(resolver) - if err != nil { - errs = multierror.Append(errs, err) + + nCatalogers := len(catalogers) + + // we do not need more parallelism than there are `catalogers`. + parallelism = int(math.Min(float64(nCatalogers), math.Max(1.0, float64(parallelism)))) + log.WithFields("parallelism", parallelism, "catalogers", nCatalogers).Debug("cataloging packages") + + jobs := make(chan pkg.Cataloger, nCatalogers) + results := make(chan *catalogResult, nCatalogers) + discoveredPackages := make(chan int64, nCatalogers) + + waitGroup := sync.WaitGroup{} + + for i := 0; i < parallelism; i++ { + waitGroup.Add(1) + + go func() { + defer waitGroup.Done() + + // wait for / get the next cataloger job available. + for cataloger := range jobs { + result, err := runCataloger(cataloger, resolver) + + // ensure we set the error to be aggregated + result.Error = err + + discoveredPackages <- result.Discovered + + results <- result + } + }() + } + + // dynamically show updated discovered package status + go func() { + for discovered := range discoveredPackages { + packagesDiscovered.N += discovered + } + }() + + // Enqueue the jobs + for _, cataloger := range catalogers { + jobs <- cataloger + } + close(jobs) + + // Wait for the jobs to finish + waitGroup.Wait() + close(results) + close(discoveredPackages) + + // collect the results + for result := range results { + if result.Error != nil { + errs = multierror.Append(errs, result.Error) continue } - - catalogedPackages := len(packages) - - log.Debugf("discovered %d packages", catalogedPackages) - packagesDiscovered.N += int64(catalogedPackages) - - for _, p := range packages { - // generate CPEs (note: this is excluded from package ID, so is safe to mutate) - // we might have binary classified CPE already with the package so we want to append here - p.CPEs = append(p.CPEs, cpe.Generate(p)...) - - // if we were not able to identify the language we have an opportunity - // to try and get this value from the PURL. Worst case we assert that - // we could not identify the language at either stage and set UnknownLanguage - if p.Language == "" { - p.Language = pkg.LanguageFromPURL(p.PURL) - } - - // create file-to-package relationships for files owned by the package - owningRelationships, err := packageFileOwnershipRelationships(p, resolver) - if err != nil { - log.Warnf("unable to create any package-file relationships for package name=%q: %w", p.Name, err) - } else { - allRelationships = append(allRelationships, owningRelationships...) - } + for _, p := range result.Packages { catalog.Add(p) } - - allRelationships = append(allRelationships, relationships...) + allRelationships = append(allRelationships, result.Relationships...) } allRelationships = append(allRelationships, pkg.NewRelationships(catalog)...) diff --git a/syft/pkg/cataloger/config.go b/syft/pkg/cataloger/config.go index 478fc292d..c75e34681 100644 --- a/syft/pkg/cataloger/config.go +++ b/syft/pkg/cataloger/config.go @@ -5,13 +5,15 @@ import ( ) type Config struct { - Search SearchConfig - Catalogers []string + Search SearchConfig + Catalogers []string + Parallelism int } func DefaultConfig() Config { return Config{ - Search: DefaultSearchConfig(), + Search: DefaultSearchConfig(), + Parallelism: 1, } } diff --git a/test/cli/packages_cmd_test.go b/test/cli/packages_cmd_test.go index fe57819e7..6a768dcdb 100644 --- a/test/cli/packages_cmd_test.go +++ b/test/cli/packages_cmd_test.go @@ -204,6 +204,31 @@ func TestPackagesCmdFlags(t *testing.T) { assertSuccessfulReturnCode, }, }, + { + name: "override-default-parallelism", + args: []string{"packages", "-vvv", "-o", "json", coverageImage}, + env: map[string]string{ + "SYFT_PARALLELISM": "2", + }, + assertions: []traitAssertion{ + // the application config in the log matches that of what we expect to have been configured. + assertInOutput("parallelism: 2"), + assertInOutput("parallelism=2"), + assertPackageCount(34), + assertSuccessfulReturnCode, + }, + }, + { + name: "default-parallelism", + args: []string{"packages", "-vvv", "-o", "json", coverageImage}, + assertions: []traitAssertion{ + // the application config in the log matches that of what we expect to have been configured. + assertInOutput("parallelism: 1"), + assertInOutput("parallelism=1"), + assertPackageCount(34), + assertSuccessfulReturnCode, + }, + }, } for _, test := range tests { diff --git a/test/integration/catalog_packages_test.go b/test/integration/catalog_packages_test.go index 8a54b1b03..84cb86ad2 100644 --- a/test/integration/catalog_packages_test.go +++ b/test/integration/catalog_packages_test.go @@ -41,7 +41,7 @@ func BenchmarkImagePackageCatalogers(b *testing.B) { b.Run(c.Name(), func(b *testing.B) { for i := 0; i < b.N; i++ { - pc, _, err = cataloger.Catalog(resolver, theDistro, c) + pc, _, err = cataloger.Catalog(resolver, theDistro, 1, c) if err != nil { b.Fatalf("failure during benchmark: %+v", err) }