mirror of
https://github.com/anchore/syft.git
synced 2025-11-17 08:23:15 +01:00
Parallel package catalog processing (#1355)
* catalog: run cataloggers concurrently Signed-off-by: mikcl <mikesmikes400@gmail.com> * frontend: expose workers as a configurable option Signed-off-by: mikcl <mikesmikes400@gmail.com> * fixup! frontend: expose workers as a configurable option Signed-off-by: mikcl <mikesmikes400@gmail.com> * update logging statements Signed-off-by: Alex Goodman <alex.goodman@anchore.com> * test: assert for debug logging Signed-off-by: mikcl <mikesmikes400@gmail.com> Signed-off-by: mikcl <mikesmikes400@gmail.com> Signed-off-by: Alex Goodman <alex.goodman@anchore.com> Co-authored-by: Alex Goodman <alex.goodman@anchore.com>
This commit is contained in:
parent
d524bd5fc3
commit
4bfb849310
@ -522,6 +522,9 @@ file-metadata:
|
||||
# SYFT_FILE_METADATA_DIGESTS env var
|
||||
digests: ["sha256"]
|
||||
|
||||
# maximum number of workers used to process the list of package catalogers in parallel
|
||||
parallelism: 1
|
||||
|
||||
# cataloging secrets is exposed through the power-user subcommand
|
||||
secrets:
|
||||
cataloger:
|
||||
|
||||
@ -56,6 +56,7 @@ type Application struct {
|
||||
Attest attest `yaml:"attest" json:"attest" mapstructure:"attest"`
|
||||
Platform string `yaml:"platform" json:"platform" mapstructure:"platform"`
|
||||
Name string `yaml:"name" json:"name" mapstructure:"name"`
|
||||
Parallelism int `yaml:"parallelism" json:"parallelism" mapstructure:"parallelism"` // the number of catalog workers to run in parallel
|
||||
}
|
||||
|
||||
func (cfg Application) ToCatalogerConfig() cataloger.Config {
|
||||
@ -66,6 +67,7 @@ func (cfg Application) ToCatalogerConfig() cataloger.Config {
|
||||
Scope: cfg.Package.Cataloger.ScopeOpt,
|
||||
},
|
||||
Catalogers: cfg.Catalogers,
|
||||
Parallelism: cfg.Parallelism,
|
||||
}
|
||||
}
|
||||
|
||||
@ -182,6 +184,7 @@ func loadDefaultValues(v *viper.Viper) {
|
||||
v.SetDefault("quiet", false)
|
||||
v.SetDefault("check-for-app-update", true)
|
||||
v.SetDefault("catalogers", nil)
|
||||
v.SetDefault("parallelism", 1)
|
||||
|
||||
// for each field in the configuration struct, see if the field implements the defaultValueLoader interface and invoke it if it does
|
||||
value := reflect.ValueOf(Application{})
|
||||
|
||||
@ -69,7 +69,7 @@ func CatalogPackages(src *source.Source, cfg cataloger.Config) (*pkg.Catalog, []
|
||||
}
|
||||
}
|
||||
|
||||
catalog, relationships, err := cataloger.Catalog(resolver, release, catalogers...)
|
||||
catalog, relationships, err := cataloger.Catalog(resolver, release, cfg.Parallelism, catalogers...)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
@ -2,6 +2,8 @@ package cataloger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/wagoodman/go-partybus"
|
||||
@ -23,6 +25,15 @@ type Monitor struct {
|
||||
PackagesDiscovered progress.Monitorable // the number of packages discovered from all registered catalogers
|
||||
}
|
||||
|
||||
// catalogResult provides the result of running a single cataloger against source
|
||||
type catalogResult struct {
|
||||
Packages []pkg.Package
|
||||
Relationships []artifact.Relationship
|
||||
// Discovered may sometimes be more than len(packages)
|
||||
Discovered int64
|
||||
Error error
|
||||
}
|
||||
|
||||
// newMonitor creates a new Monitor object and publishes the object on the bus as a PackageCatalogerStarted event.
|
||||
func newMonitor() (*progress.Manual, *progress.Manual) {
|
||||
filesProcessed := progress.Manual{}
|
||||
@ -38,29 +49,21 @@ func newMonitor() (*progress.Manual, *progress.Manual) {
|
||||
return &filesProcessed, &packagesDiscovered
|
||||
}
|
||||
|
||||
// Catalog a given source (container image or filesystem) with the given catalogers, returning all discovered packages.
|
||||
// In order to efficiently retrieve contents from a underlying container image the content fetch requests are
|
||||
// done in bulk. Specifically, all files of interest are collected from each catalogers and accumulated into a single
|
||||
// request.
|
||||
func Catalog(resolver source.FileResolver, release *linux.Release, catalogers ...pkg.Cataloger) (*pkg.Catalog, []artifact.Relationship, error) {
|
||||
catalog := pkg.NewCatalog()
|
||||
var allRelationships []artifact.Relationship
|
||||
filesProcessed, packagesDiscovered := newMonitor()
|
||||
// perform analysis, accumulating errors for each failed analysis
|
||||
var errs error
|
||||
for _, c := range catalogers {
|
||||
func runCataloger(cataloger pkg.Cataloger, resolver source.FileResolver) (*catalogResult, error) {
|
||||
catalogerResult := new(catalogResult)
|
||||
|
||||
// find packages from the underlying raw data
|
||||
log.Debugf("cataloging with %q", c.Name())
|
||||
packages, relationships, err := c.Catalog(resolver)
|
||||
log.WithFields("cataloger", cataloger.Name()).Trace("cataloging started")
|
||||
packages, relationships, err := cataloger.Catalog(resolver)
|
||||
if err != nil {
|
||||
errs = multierror.Append(errs, err)
|
||||
continue
|
||||
log.WithFields("cataloger", cataloger.Name()).Warn("error while cataloging")
|
||||
return catalogerResult, err
|
||||
}
|
||||
|
||||
catalogedPackages := len(packages)
|
||||
|
||||
log.Debugf("discovered %d packages", catalogedPackages)
|
||||
packagesDiscovered.N += int64(catalogedPackages)
|
||||
log.WithFields("cataloger", cataloger.Name()).Debugf("discovered %d packages", catalogedPackages)
|
||||
catalogerResult.Discovered = int64(catalogedPackages)
|
||||
|
||||
for _, p := range packages {
|
||||
// generate CPEs (note: this is excluded from package ID, so is safe to mutate)
|
||||
@ -77,14 +80,90 @@ func Catalog(resolver source.FileResolver, release *linux.Release, catalogers ..
|
||||
// create file-to-package relationships for files owned by the package
|
||||
owningRelationships, err := packageFileOwnershipRelationships(p, resolver)
|
||||
if err != nil {
|
||||
log.Warnf("unable to create any package-file relationships for package name=%q: %w", p.Name, err)
|
||||
log.WithFields("cataloger", cataloger.Name(), "package", p.Name, "error", err).Warnf("unable to create any package-file relationships")
|
||||
} else {
|
||||
allRelationships = append(allRelationships, owningRelationships...)
|
||||
catalogerResult.Relationships = append(catalogerResult.Relationships, owningRelationships...)
|
||||
}
|
||||
catalog.Add(p)
|
||||
catalogerResult.Packages = append(catalogerResult.Packages, p)
|
||||
}
|
||||
catalogerResult.Relationships = append(catalogerResult.Relationships, relationships...)
|
||||
log.WithFields("cataloger", cataloger.Name()).Trace("cataloging complete")
|
||||
return catalogerResult, nil
|
||||
}
|
||||
|
||||
// Catalog a given source (container image or filesystem) with the given catalogers, returning all discovered packages.
|
||||
// In order to efficiently retrieve contents from a underlying container image the content fetch requests are
|
||||
// done in bulk. Specifically, all files of interest are collected from each catalogers and accumulated into a single
|
||||
// request.
|
||||
//
|
||||
//nolint:funlen
|
||||
func Catalog(resolver source.FileResolver, release *linux.Release, parallelism int, catalogers ...pkg.Cataloger) (*pkg.Catalog, []artifact.Relationship, error) {
|
||||
catalog := pkg.NewCatalog()
|
||||
var allRelationships []artifact.Relationship
|
||||
filesProcessed, packagesDiscovered := newMonitor()
|
||||
// perform analysis, accumulating errors for each failed analysis
|
||||
var errs error
|
||||
|
||||
nCatalogers := len(catalogers)
|
||||
|
||||
// we do not need more parallelism than there are `catalogers`.
|
||||
parallelism = int(math.Min(float64(nCatalogers), math.Max(1.0, float64(parallelism))))
|
||||
log.WithFields("parallelism", parallelism, "catalogers", nCatalogers).Debug("cataloging packages")
|
||||
|
||||
jobs := make(chan pkg.Cataloger, nCatalogers)
|
||||
results := make(chan *catalogResult, nCatalogers)
|
||||
discoveredPackages := make(chan int64, nCatalogers)
|
||||
|
||||
waitGroup := sync.WaitGroup{}
|
||||
|
||||
for i := 0; i < parallelism; i++ {
|
||||
waitGroup.Add(1)
|
||||
|
||||
go func() {
|
||||
defer waitGroup.Done()
|
||||
|
||||
// wait for / get the next cataloger job available.
|
||||
for cataloger := range jobs {
|
||||
result, err := runCataloger(cataloger, resolver)
|
||||
|
||||
// ensure we set the error to be aggregated
|
||||
result.Error = err
|
||||
|
||||
discoveredPackages <- result.Discovered
|
||||
|
||||
results <- result
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
allRelationships = append(allRelationships, relationships...)
|
||||
// dynamically show updated discovered package status
|
||||
go func() {
|
||||
for discovered := range discoveredPackages {
|
||||
packagesDiscovered.N += discovered
|
||||
}
|
||||
}()
|
||||
|
||||
// Enqueue the jobs
|
||||
for _, cataloger := range catalogers {
|
||||
jobs <- cataloger
|
||||
}
|
||||
close(jobs)
|
||||
|
||||
// Wait for the jobs to finish
|
||||
waitGroup.Wait()
|
||||
close(results)
|
||||
close(discoveredPackages)
|
||||
|
||||
// collect the results
|
||||
for result := range results {
|
||||
if result.Error != nil {
|
||||
errs = multierror.Append(errs, result.Error)
|
||||
continue
|
||||
}
|
||||
for _, p := range result.Packages {
|
||||
catalog.Add(p)
|
||||
}
|
||||
allRelationships = append(allRelationships, result.Relationships...)
|
||||
}
|
||||
|
||||
allRelationships = append(allRelationships, pkg.NewRelationships(catalog)...)
|
||||
|
||||
@ -7,11 +7,13 @@ import (
|
||||
type Config struct {
|
||||
Search SearchConfig
|
||||
Catalogers []string
|
||||
Parallelism int
|
||||
}
|
||||
|
||||
func DefaultConfig() Config {
|
||||
return Config{
|
||||
Search: DefaultSearchConfig(),
|
||||
Parallelism: 1,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -204,6 +204,31 @@ func TestPackagesCmdFlags(t *testing.T) {
|
||||
assertSuccessfulReturnCode,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "override-default-parallelism",
|
||||
args: []string{"packages", "-vvv", "-o", "json", coverageImage},
|
||||
env: map[string]string{
|
||||
"SYFT_PARALLELISM": "2",
|
||||
},
|
||||
assertions: []traitAssertion{
|
||||
// the application config in the log matches that of what we expect to have been configured.
|
||||
assertInOutput("parallelism: 2"),
|
||||
assertInOutput("parallelism=2"),
|
||||
assertPackageCount(34),
|
||||
assertSuccessfulReturnCode,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "default-parallelism",
|
||||
args: []string{"packages", "-vvv", "-o", "json", coverageImage},
|
||||
assertions: []traitAssertion{
|
||||
// the application config in the log matches that of what we expect to have been configured.
|
||||
assertInOutput("parallelism: 1"),
|
||||
assertInOutput("parallelism=1"),
|
||||
assertPackageCount(34),
|
||||
assertSuccessfulReturnCode,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
||||
@ -41,7 +41,7 @@ func BenchmarkImagePackageCatalogers(b *testing.B) {
|
||||
|
||||
b.Run(c.Name(), func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
pc, _, err = cataloger.Catalog(resolver, theDistro, c)
|
||||
pc, _, err = cataloger.Catalog(resolver, theDistro, 1, c)
|
||||
if err != nil {
|
||||
b.Fatalf("failure during benchmark: %+v", err)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user