diff --git a/cmd/syft/internal/options/catalog.go b/cmd/syft/internal/options/catalog.go index 599af87f2..a3fd88c46 100644 --- a/cmd/syft/internal/options/catalog.go +++ b/cmd/syft/internal/options/catalog.go @@ -68,6 +68,7 @@ var _ interface { } = (*Catalog)(nil) func DefaultCatalog() Catalog { + cfg := syft.DefaultCreateSBOMConfig() return Catalog{ Compliance: defaultComplianceConfig(), Scope: source.SquashedScope.String(), @@ -81,7 +82,7 @@ func DefaultCatalog() Catalog { Relationships: defaultRelationshipsConfig(), Unknowns: defaultUnknowns(), Source: defaultSourceConfig(), - Parallelism: 1, + Parallelism: cfg.Parallelism, } } @@ -222,6 +223,9 @@ func (cfg *Catalog) AddFlags(flags clio.FlagSet) { flags.StringArrayVarP(&cfg.Catalogers, "catalogers", "", "enable one or more package catalogers") + flags.IntVarP(&cfg.Parallelism, "parallelism", "", + "number of cataloger workers to run in parallel") + if pfp, ok := flags.(fangs.PFlagSetProvider); ok { if err := pfp.PFlagSet().MarkDeprecated("catalogers", "use: override-default-catalogers and select-catalogers"); err != nil { panic(err) @@ -250,7 +254,8 @@ func (cfg *Catalog) AddFlags(flags clio.FlagSet) { } func (cfg *Catalog) DescribeFields(descriptions fangs.FieldDescriptionSet) { - descriptions.Add(&cfg.Parallelism, "number of cataloger workers to run in parallel") + descriptions.Add(&cfg.Parallelism, `number of cataloger workers to run in parallel +by default, when set to 0: this will be based on runtime.NumCPU * 4, if set to less than 0 it will be unbounded`) descriptions.Add(&cfg.Enrich, fmt.Sprintf(`Enable data enrichment operations, which can utilize services such as Maven Central and NPM. By default all enrichment is disabled, use: all to enable everything. diff --git a/go.mod b/go.mod index 1b44e8471..b80c8b255 100644 --- a/go.mod +++ b/go.mod @@ -260,6 +260,8 @@ require ( modernc.org/memory v1.8.2 // indirect ) +require github.com/anchore/go-sync v0.0.0-20250326131806-4eda43a485b6 + retract ( v0.53.2 v0.53.1 // Published accidentally with incorrect license in depdencies diff --git a/go.sum b/go.sum index 2c5a999fc..fe64b46cf 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,8 @@ github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb h1:iDMnx6LIj github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb/go.mod h1:DmTY2Mfcv38hsHbG78xMiTDdxFtkHpgYNVDPsF2TgHk= github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 h1:aM1rlcoLz8y5B2r4tTLMiVTrMtpfY0O8EScKJxaSaEc= github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092/go.mod h1:rYqSE9HbjzpHTI74vwPvae4ZVYZd1lue2ta6xHPdblA= +github.com/anchore/go-sync v0.0.0-20250326131806-4eda43a485b6 h1:Ha+LSCVuXYSYGi7wIkJK6G8g6jI3LH7y6LbyEVyp4Io= +github.com/anchore/go-sync v0.0.0-20250326131806-4eda43a485b6/go.mod h1:+9oM3XUy8iea/vWj9FhZ9bQGUBN8JpPxxJm5Wbcx9XM= github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 h1:VzprUTpc0vW0nnNKJfJieyH/TZ9UYAnTZs5/gHTdAe8= github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04/go.mod h1:6dK64g27Qi1qGQZ67gFmBFvEHScy0/C8qhQhNe5B5pQ= github.com/anchore/go-version v1.2.2-0.20200701162849-18adb9c92b9b h1:e1bmaoJfZVsCYMrIZBpFxwV26CbsuoEh5muXD5I1Ods= diff --git a/internal/file/digest.go b/internal/file/digest.go index 8c9932d9f..c9aa1552c 100644 --- a/internal/file/digest.go +++ b/internal/file/digest.go @@ -1,12 +1,15 @@ package file import ( + "context" "crypto" "fmt" "hash" "io" "strings" + "github.com/anchore/go-sync" + "github.com/anchore/syft/syft/cataloging" "github.com/anchore/syft/syft/file" ) @@ -21,7 +24,7 @@ func supportedHashAlgorithms() []crypto.Hash { } } -func NewDigestsFromFile(closer io.ReadCloser, hashes []crypto.Hash) ([]file.Digest, error) { +func NewDigestsFromFile(ctx context.Context, closer io.ReadCloser, hashes []crypto.Hash) ([]file.Digest, error) { hashes = NormalizeHashes(hashes) // create a set of hasher objects tied together with a single writer to feed content into hashers := make([]hash.Hash, len(hashes)) @@ -31,7 +34,7 @@ func NewDigestsFromFile(closer io.ReadCloser, hashes []crypto.Hash) ([]file.Dige writers[idx] = hashers[idx] } - size, err := io.Copy(io.MultiWriter(writers...), closer) + size, err := io.Copy(sync.ParallelWriter(ctx, cataloging.ExecutorCPU, writers...), closer) if err != nil { return nil, err } diff --git a/internal/file/digest_test.go b/internal/file/digest_test.go index df50798f2..4e793743f 100644 --- a/internal/file/digest_test.go +++ b/internal/file/digest_test.go @@ -1,6 +1,7 @@ package file import ( + "context" "crypto" "os" "testing" @@ -81,7 +82,7 @@ func TestNewDigestsFromFile(t *testing.T) { fh, err := os.Open(tt.fixture) require.NoError(t, err) - got, err := NewDigestsFromFile(fh, tt.hashes) + got, err := NewDigestsFromFile(context.TODO(), fh, tt.hashes) tt.wantErr(t, err) if err != nil { return diff --git a/internal/task/executor.go b/internal/task/executor.go index 70727cf17..f078ed689 100644 --- a/internal/task/executor.go +++ b/internal/task/executor.go @@ -5,11 +5,8 @@ import ( "fmt" "runtime/debug" "slices" - "sync" "time" - "github.com/hashicorp/go-multierror" - "github.com/anchore/syft/internal/log" "github.com/anchore/syft/internal/sbomsync" "github.com/anchore/syft/internal/unknown" @@ -18,64 +15,17 @@ import ( "github.com/anchore/syft/syft/sbom" ) -type Executor struct { - numWorkers int - tasks chan Task -} - -func NewTaskExecutor(tasks []Task, numWorkers int) *Executor { - p := &Executor{ - numWorkers: numWorkers, - tasks: make(chan Task, len(tasks)), +func RunTask(ctx context.Context, tsk Task, resolver file.Resolver, s sbomsync.Builder, prog *monitor.CatalogerTaskProgress) error { + err := runTaskSafely(ctx, tsk, resolver, s) + unknowns, remainingErrors := unknown.ExtractCoordinateErrors(err) + if len(unknowns) > 0 { + appendUnknowns(s, tsk.Name(), unknowns) } - - for i := range tasks { - p.tasks <- tasks[i] + if remainingErrors != nil { + prog.SetError(remainingErrors) } - close(p.tasks) - - return p -} - -func (p *Executor) Execute(ctx context.Context, resolver file.Resolver, s sbomsync.Builder, prog *monitor.CatalogerTaskProgress) error { - var lock sync.Mutex - withLock := func(fn func()) { - lock.Lock() - defer lock.Unlock() - fn() - } - var errs error - wg := &sync.WaitGroup{} - for i := 0; i < p.numWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - for { - tsk, ok := <-p.tasks - if !ok { - return - } - - err := runTaskSafely(ctx, tsk, resolver, s) - unknowns, remainingErrors := unknown.ExtractCoordinateErrors(err) - if len(unknowns) > 0 { - appendUnknowns(s, tsk.Name(), unknowns) - } - if remainingErrors != nil { - withLock(func() { - errs = multierror.Append(errs, fmt.Errorf("failed to run task: %w", remainingErrors)) - prog.SetError(remainingErrors) - }) - } - prog.Increment() - } - }() - } - - wg.Wait() - - return errs + prog.Increment() + return remainingErrors } func appendUnknowns(builder sbomsync.Builder, taskName string, unknowns []unknown.CoordinateError) { diff --git a/internal/task/executor_test.go b/internal/task/executor_test.go index 5da1a0f40..c739d7e6b 100644 --- a/internal/task/executor_test.go +++ b/internal/task/executor_test.go @@ -16,9 +16,8 @@ func Test_TaskExecutor_PanicHandling(t *testing.T) { tsk := NewTask("panicking-cataloger", func(_ context.Context, _ file.Resolver, _ sbomsync.Builder) error { panic("something bad happened") }) - ex := NewTaskExecutor([]Task{tsk}, 1) - err := ex.Execute(context.Background(), nil, nil, &monitor.CatalogerTaskProgress{ + err := RunTask(context.Background(), tsk, nil, nil, &monitor.CatalogerTaskProgress{ Manual: progress.NewManual(-1), }) diff --git a/internal/task/file_tasks.go b/internal/task/file_tasks.go index 2ee08f72b..7af11fcd1 100644 --- a/internal/task/file_tasks.go +++ b/internal/task/file_tasks.go @@ -120,14 +120,14 @@ func newExecutableCatalogerTaskFactory(tags ...string) factory { } func newExecutableCatalogerTask(selection file.Selection, cfg executable.Config, tags ...string) Task { - fn := func(_ context.Context, resolver file.Resolver, builder sbomsync.Builder) error { + fn := func(ctx context.Context, resolver file.Resolver, builder sbomsync.Builder) error { if selection == file.NoFilesSelection { return nil } accessor := builder.(sbomsync.Accessor) - result, err := executable.NewCataloger(cfg).Catalog(resolver) + result, err := executable.NewCataloger(cfg).CatalogCtx(ctx, resolver) accessor.WriteToSBOM(func(sbom *sbom.SBOM) { sbom.Artifacts.Executables = result diff --git a/syft/cataloging/executor.go b/syft/cataloging/executor.go new file mode 100644 index 000000000..4fc997364 --- /dev/null +++ b/syft/cataloging/executor.go @@ -0,0 +1,8 @@ +package cataloging + +// ExecutorCPU is the name to use when executing parallel functions which are CPU-intensive, such as +// hashing full files +const ExecutorCPU = "cpu" + +// ExecutorFile is the name to use when executing parallel file reading functions, such as cataloging +const ExecutorFile = "file" diff --git a/syft/create_sbom.go b/syft/create_sbom.go index 1d578191a..aec9fdcae 100644 --- a/syft/create_sbom.go +++ b/syft/create_sbom.go @@ -3,16 +3,19 @@ package syft import ( "context" "fmt" + "runtime" "sort" "github.com/dustin/go-humanize" "github.com/scylladb/go-set/strset" + "github.com/anchore/go-sync" "github.com/anchore/syft/internal/bus" "github.com/anchore/syft/internal/licenses" "github.com/anchore/syft/internal/sbomsync" "github.com/anchore/syft/internal/task" "github.com/anchore/syft/syft/artifact" + "github.com/anchore/syft/syft/cataloging" "github.com/anchore/syft/syft/event/monitor" "github.com/anchore/syft/syft/pkg" "github.com/anchore/syft/syft/sbom" @@ -62,22 +65,20 @@ func CreateSBOM(ctx context.Context, src source.Source, cfg *CreateSBOMConfig) ( }, } - // inject a single license scanner and content config for all package cataloging tasks into context - licenseScanner, err := licenses.NewDefaultScanner( - licenses.WithIncludeLicenseContent(cfg.Licenses.IncludeUnkownLicenseContent), - licenses.WithCoverage(cfg.Licenses.Coverage), - ) + // setup everything we need in context: license scanner, executors, etc. + ctx, err = setupContext(ctx, cfg) if err != nil { - return nil, fmt.Errorf("could not build licenseScanner for cataloging: %w", err) + return nil, err } - ctx = licenses.SetContextLicenseScanner(ctx, licenseScanner) catalogingProgress := monitorCatalogingTask(src.ID(), taskGroups) packageCatalogingProgress := monitorPackageCatalogingTask() builder := sbomsync.NewBuilder(&s, monitorPackageCount(packageCatalogingProgress)) for i := range taskGroups { - err := task.NewTaskExecutor(taskGroups[i], cfg.Parallelism).Execute(ctx, resolver, builder, catalogingProgress) + err = sync.Collect(&ctx, cataloging.ExecutorFile, sync.ToSeq(taskGroups[i]), func(t task.Task) (any, error) { + return nil, task.RunTask(ctx, t, resolver, builder, catalogingProgress) + }, nil) if err != nil { // TODO: tie this to the open progress monitors... return nil, fmt.Errorf("failed to run tasks: %w", err) @@ -90,6 +91,53 @@ func CreateSBOM(ctx context.Context, src source.Source, cfg *CreateSBOMConfig) ( return &s, nil } +func setupContext(ctx context.Context, cfg *CreateSBOMConfig) (context.Context, error) { + // configure parallel executors + ctx = setContextExecutors(ctx, cfg) + + // configure license scanner + return setContextLicenseScanner(ctx, cfg) +} + +func setContextLicenseScanner(ctx context.Context, cfg *CreateSBOMConfig) (context.Context, error) { + // inject a single license scanner and content config for all package cataloging tasks into context + licenseScanner, err := licenses.NewDefaultScanner( + licenses.WithIncludeLicenseContent(cfg.Licenses.IncludeUnkownLicenseContent), + licenses.WithCoverage(cfg.Licenses.Coverage), + ) + if err != nil { + return nil, fmt.Errorf("could not build licenseScanner for cataloging: %w", err) + } + ctx = licenses.SetContextLicenseScanner(ctx, licenseScanner) + return ctx, nil +} + +func setContextExecutors(ctx context.Context, cfg *CreateSBOMConfig) context.Context { + parallelism := 0 + if cfg != nil { + parallelism = cfg.Parallelism + } + // executor parallelism is: 0 == serial, no goroutines, 1 == max 1 goroutine + // so if they set 1, we just run in serial to avoid overhead, and treat 0 as default, reasonable max for the system + // negative is unbounded, so no need for any other special handling + switch parallelism { + case 0: + parallelism = runtime.NumCPU() * 4 + case 1: + parallelism = 0 // run in serial, don't spawn goroutines + case -99: + parallelism = 1 // special case to catch incorrect executor usage during testing + } + // set up executors for each dimension we want to coordinate bounds for + if !sync.HasContextExecutor(ctx, cataloging.ExecutorCPU) { + ctx = sync.SetContextExecutor(ctx, cataloging.ExecutorCPU, sync.NewExecutor(parallelism)) + } + if !sync.HasContextExecutor(ctx, cataloging.ExecutorFile) { + ctx = sync.SetContextExecutor(ctx, cataloging.ExecutorFile, sync.NewExecutor(parallelism)) + } + return ctx +} + func monitorPackageCount(prog *monitor.CatalogerTaskProgress) func(s *sbom.SBOM) { return func(s *sbom.SBOM) { count := humanize.Comma(int64(s.Artifacts.Packages.PackageCount())) diff --git a/syft/create_sbom_config.go b/syft/create_sbom_config.go index 1d4d2eadf..0e7e828fd 100644 --- a/syft/create_sbom_config.go +++ b/syft/create_sbom_config.go @@ -49,7 +49,7 @@ func DefaultCreateSBOMConfig() *CreateSBOMConfig { Packages: pkgcataloging.DefaultConfig(), Licenses: cataloging.DefaultLicenseConfig(), Files: filecataloging.DefaultConfig(), - Parallelism: 1, + Parallelism: 0, // use default: run in parallel based on number of CPUs packageTaskFactories: task.DefaultPackageTaskFactories(), // library consumers are free to override the tool values to fit their needs, however, we have some sane defaults @@ -91,10 +91,6 @@ func (c *CreateSBOMConfig) WithTool(name, version string, cfg ...any) *CreateSBO // WithParallelism allows for setting the number of concurrent cataloging tasks that can be performed at once func (c *CreateSBOMConfig) WithParallelism(p int) *CreateSBOMConfig { - if p < 1 { - // TODO: warn? - p = 1 - } c.Parallelism = p return c } diff --git a/syft/file/cataloger/executable/cataloger.go b/syft/file/cataloger/executable/cataloger.go index 7bd49307c..05d29b009 100644 --- a/syft/file/cataloger/executable/cataloger.go +++ b/syft/file/cataloger/executable/cataloger.go @@ -2,6 +2,7 @@ package executable import ( "bytes" + "context" "debug/elf" "debug/macho" "encoding/binary" @@ -11,11 +12,13 @@ import ( "github.com/bmatcuk/doublestar/v4" "github.com/dustin/go-humanize" + "github.com/anchore/go-sync" "github.com/anchore/syft/internal" "github.com/anchore/syft/internal/bus" "github.com/anchore/syft/internal/log" "github.com/anchore/syft/internal/mimetype" "github.com/anchore/syft/internal/unknown" + "github.com/anchore/syft/syft/cataloging" "github.com/anchore/syft/syft/event/monitor" "github.com/anchore/syft/syft/file" "github.com/anchore/syft/syft/internal/unionreader" @@ -46,8 +49,10 @@ func NewCataloger(cfg Config) *Cataloger { } func (i *Cataloger) Catalog(resolver file.Resolver) (map[file.Coordinates]file.Executable, error) { - var errs error + return i.CatalogCtx(context.Background(), resolver) +} +func (i *Cataloger) CatalogCtx(ctx context.Context, resolver file.Resolver) (map[file.Coordinates]file.Executable, error) { locs, err := resolver.FilesByMIMEType(i.config.MIMETypes...) if err != nil { return nil, fmt.Errorf("unable to get file locations for binaries: %w", err) @@ -61,19 +66,20 @@ func (i *Cataloger) Catalog(resolver file.Resolver) (map[file.Coordinates]file.E prog := catalogingProgress(int64(len(locs))) results := make(map[file.Coordinates]file.Executable) - for _, loc := range locs { + errs := sync.Collect(&ctx, cataloging.ExecutorFile, sync.ToSeq(locs), func(loc file.Location) (*file.Executable, error) { prog.AtomicStage.Set(loc.Path()) exec, err := processExecutableLocation(loc, resolver) if err != nil { - errs = unknown.Append(errs, loc, err) + err = unknown.New(loc, err) } - + return exec, err + }, func(loc file.Location, exec *file.Executable) { if exec != nil { prog.Increment() results[loc.Coordinates] = *exec } - } + }) log.Debugf("executable cataloger processed %d files", len(results)) diff --git a/syft/file/cataloger/filedigest/cataloger.go b/syft/file/cataloger/filedigest/cataloger.go index f8aa9ace1..15a770ef1 100644 --- a/syft/file/cataloger/filedigest/cataloger.go +++ b/syft/file/cataloger/filedigest/cataloger.go @@ -8,12 +8,14 @@ import ( "github.com/dustin/go-humanize" + "github.com/anchore/go-sync" stereoscopeFile "github.com/anchore/stereoscope/pkg/file" "github.com/anchore/syft/internal" "github.com/anchore/syft/internal/bus" intFile "github.com/anchore/syft/internal/file" "github.com/anchore/syft/internal/log" "github.com/anchore/syft/internal/unknown" + "github.com/anchore/syft/syft/cataloging" "github.com/anchore/syft/syft/event/monitor" "github.com/anchore/syft/syft/file" intCataloger "github.com/anchore/syft/syft/file/cataloger/internal" @@ -34,7 +36,6 @@ func NewCataloger(hashes []crypto.Hash) *Cataloger { func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordinates ...file.Coordinates) (map[file.Coordinates][]file.Digest, error) { results := make(map[file.Coordinates][]file.Digest) var locations []file.Location - var errs error if len(coordinates) == 0 { locations = intCataloger.AllRegularFiles(ctx, resolver) @@ -49,41 +50,44 @@ func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordin } prog := catalogingProgress(int64(len(locations))) - for _, location := range locations { - result, err := i.catalogLocation(resolver, location) + + err := sync.Collect(&ctx, cataloging.ExecutorFile, sync.ToSeq(locations), func(location file.Location) ([]file.Digest, error) { + result, err := i.catalogLocation(ctx, resolver, location) if errors.Is(err, ErrUndigestableFile) { - continue + return nil, nil } prog.AtomicStage.Set(location.Path()) if internal.IsErrPathPermission(err) { log.Debugf("file digests cataloger skipping %q: %+v", location.RealPath, err) - errs = unknown.Append(errs, location, err) - continue + return nil, unknown.New(location, err) } if err != nil { prog.SetError(err) - errs = unknown.Append(errs, location, err) - continue + return nil, unknown.New(location, err) } prog.Increment() - results[location.Coordinates] = result - } + return result, nil + }, func(location file.Location, digests []file.Digest) { + if len(digests) > 0 { + results[location.Coordinates] = digests + } + }) log.Debugf("file digests cataloger processed %d files", prog.Current()) prog.AtomicStage.Set(fmt.Sprintf("%s files", humanize.Comma(prog.Current()))) prog.SetCompleted() - return results, errs + return results, err } -func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Location) ([]file.Digest, error) { +func (i *Cataloger) catalogLocation(ctx context.Context, resolver file.Resolver, location file.Location) ([]file.Digest, error) { meta, err := resolver.FileMetadataByLocation(location) if err != nil { return nil, err @@ -100,7 +104,7 @@ func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Locati } defer internal.CloseAndLogError(contentReader, location.AccessPath) - digests, err := intFile.NewDigestsFromFile(contentReader, i.hashes) + digests, err := intFile.NewDigestsFromFile(ctx, contentReader, i.hashes) if err != nil { return nil, internal.ErrPath{Context: "digests-cataloger", Path: location.RealPath, Err: err} } diff --git a/syft/file/cataloger/filedigest/cataloger_test.go b/syft/file/cataloger/filedigest/cataloger_test.go index 148a18525..5fbdf9bd1 100644 --- a/syft/file/cataloger/filedigest/cataloger_test.go +++ b/syft/file/cataloger/filedigest/cataloger_test.go @@ -65,13 +65,13 @@ func TestDigestsCataloger(t *testing.T) { name: "md5", digests: []crypto.Hash{crypto.MD5}, files: []string{"test-fixtures/last/empty/empty", "test-fixtures/last/path.txt"}, - expected: testDigests(t, "test-fixtures/last", []string{"empty/empty", "path.txt"}, crypto.MD5), + expected: testDigests(t, "test-fixtures/last", []string{"path.txt"}, crypto.MD5), }, { name: "md5-sha1-sha256", digests: []crypto.Hash{crypto.MD5, crypto.SHA1, crypto.SHA256}, files: []string{"test-fixtures/last/empty/empty", "test-fixtures/last/path.txt"}, - expected: testDigests(t, "test-fixtures/last", []string{"empty/empty", "path.txt"}, crypto.MD5, crypto.SHA1, crypto.SHA256), + expected: testDigests(t, "test-fixtures/last", []string{"path.txt"}, crypto.MD5, crypto.SHA1, crypto.SHA256), }, } diff --git a/syft/pkg/cataloger/debian/package.go b/syft/pkg/cataloger/debian/package.go index 280e57094..e44cb3844 100644 --- a/syft/pkg/cataloger/debian/package.go +++ b/syft/pkg/cataloger/debian/package.go @@ -26,7 +26,7 @@ func newDpkgPackage(d pkg.DpkgDBEntry, dbLocation file.Location, resolver file.R // TODO: separate pr to license refactor, but explore extracting dpkg-specific license parsing into a separate function var licenses []pkg.License - locations := file.NewLocationSet(dbLocation.WithAnnotation(pkg.EvidenceAnnotationKey, pkg.PrimaryEvidenceAnnotation)) + locations := file.NewLocationSet(dbLocation) locations.Add(evidence...) p := pkg.Package{ diff --git a/syft/pkg/cataloger/debian/parse_dpkg_db.go b/syft/pkg/cataloger/debian/parse_dpkg_db.go index 57fc065b8..0e8d00b5a 100644 --- a/syft/pkg/cataloger/debian/parse_dpkg_db.go +++ b/syft/pkg/cataloger/debian/parse_dpkg_db.go @@ -13,10 +13,12 @@ import ( "github.com/dustin/go-humanize" "github.com/go-viper/mapstructure/v2" + "github.com/anchore/go-sync" "github.com/anchore/syft/internal" "github.com/anchore/syft/internal/log" "github.com/anchore/syft/internal/unknown" "github.com/anchore/syft/syft/artifact" + "github.com/anchore/syft/syft/cataloging" "github.com/anchore/syft/syft/file" "github.com/anchore/syft/syft/pkg" "github.com/anchore/syft/syft/pkg/cataloger/generic" @@ -28,17 +30,17 @@ var ( ) // parseDpkgDB reads a dpkg database "status" file (and surrounding data files) and returns the packages and relationships found. -func parseDpkgDB(_ context.Context, resolver file.Resolver, env *generic.Environment, reader file.LocationReadCloser) ([]pkg.Package, []artifact.Relationship, error) { +func parseDpkgDB(ctx context.Context, resolver file.Resolver, env *generic.Environment, reader file.LocationReadCloser) ([]pkg.Package, []artifact.Relationship, error) { metadata, err := parseDpkgStatus(reader) if err != nil { return nil, nil, fmt.Errorf("unable to catalog dpkg DB=%q: %w", reader.RealPath, err) } + dbLoc := reader.Location.WithAnnotation(pkg.EvidenceAnnotationKey, pkg.PrimaryEvidenceAnnotation) var pkgs []pkg.Package - for _, m := range metadata { - p := newDpkgPackage(m, reader.Location, resolver, env.LinuxRelease, findDpkgInfoFiles(m.Package, resolver, reader.Location)...) - pkgs = append(pkgs, p) - } + _ = sync.CollectSlice(&ctx, cataloging.ExecutorFile, sync.ToSeq(metadata), func(m pkg.DpkgDBEntry) (pkg.Package, error) { + return newDpkgPackage(m, dbLoc, resolver, env.LinuxRelease, findDpkgInfoFiles(m.Package, resolver, reader.Location)...), nil + }, &pkgs) return pkgs, nil, unknown.IfEmptyf(pkgs, "unable to determine packages") } diff --git a/syft/pkg/cataloger/generic/cataloger.go b/syft/pkg/cataloger/generic/cataloger.go index df14b4c6e..91790d2b7 100644 --- a/syft/pkg/cataloger/generic/cataloger.go +++ b/syft/pkg/cataloger/generic/cataloger.go @@ -4,10 +4,12 @@ import ( "context" "github.com/anchore/go-logger" + "github.com/anchore/go-sync" "github.com/anchore/syft/internal" "github.com/anchore/syft/internal/log" "github.com/anchore/syft/internal/unknown" "github.com/anchore/syft/syft/artifact" + "github.com/anchore/syft/syft/cataloging" "github.com/anchore/syft/syft/file" "github.com/anchore/syft/syft/linux" "github.com/anchore/syft/syft/pkg" @@ -161,7 +163,11 @@ func (c *Cataloger) Catalog(ctx context.Context, resolver file.Resolver) ([]pkg. LinuxRelease: linux.IdentifyRelease(resolver), } - for _, req := range c.selectFiles(resolver) { + type result struct { + pkgs []pkg.Package + rels []artifact.Relationship + } + errs = sync.Collect(&ctx, cataloging.ExecutorFile, sync.ToSeq(c.selectFiles(resolver)), func(req request) (result, error) { location, parser := req.Location, req.Parser log.WithFields("path", location.RealPath).Trace("parsing file contents") @@ -171,14 +177,14 @@ func (c *Cataloger) Catalog(ctx context.Context, resolver file.Resolver) ([]pkg. // parsers may return errors and valid packages / relationships errs = unknown.Append(errs, location, err) } - - for _, p := range discoveredPackages { + return result{discoveredPackages, discoveredRelationships}, errs + }, func(_ request, res result) { + for _, p := range res.pkgs { p.FoundBy = c.upstreamCataloger packages = append(packages, p) } - - relationships = append(relationships, discoveredRelationships...) - } + relationships = append(relationships, res.rels...) + }) return c.process(ctx, resolver, packages, relationships, errs) } diff --git a/syft/pkg/cataloger/java/archive_parser.go b/syft/pkg/cataloger/java/archive_parser.go index 85cae3d28..8893d5008 100644 --- a/syft/pkg/cataloger/java/archive_parser.go +++ b/syft/pkg/cataloger/java/archive_parser.go @@ -249,7 +249,7 @@ func (j *archiveParser) discoverMainPackage(ctx context.Context) (*pkg.Package, } // grab and assign digest for the entire archive - digests, err := getDigestsFromArchive(j.archivePath) + digests, err := getDigestsFromArchive(ctx, j.archivePath) if err != nil { return nil, err } @@ -475,7 +475,7 @@ func (j *archiveParser) discoverPkgsFromAllMavenFiles(ctx context.Context, paren return pkgs, nil } -func getDigestsFromArchive(archivePath string) ([]file.Digest, error) { +func getDigestsFromArchive(ctx context.Context, archivePath string) ([]file.Digest, error) { archiveCloser, err := os.Open(archivePath) if err != nil { return nil, fmt.Errorf("unable to open archive path (%s): %w", archivePath, err) @@ -483,7 +483,7 @@ func getDigestsFromArchive(archivePath string) ([]file.Digest, error) { defer internal.CloseAndLogError(archiveCloser, archivePath) // grab and assign digest for the entire archive - digests, err := intFile.NewDigestsFromFile(archiveCloser, javaArchiveHashes) + digests, err := intFile.NewDigestsFromFile(ctx, archiveCloser, javaArchiveHashes) if err != nil { log.Debugf("failed to create digest for file=%q: %+v", archivePath, err) } diff --git a/syft/source/filesource/file_source.go b/syft/source/filesource/file_source.go index caeec6e55..a87ace55d 100644 --- a/syft/source/filesource/file_source.go +++ b/syft/source/filesource/file_source.go @@ -1,6 +1,7 @@ package filesource import ( + "context" "crypto" "fmt" "os" @@ -68,7 +69,7 @@ func New(cfg Config) (source.Source, error) { defer fh.Close() - digests, err = intFile.NewDigestsFromFile(fh, cfg.DigestAlgorithms) + digests, err = intFile.NewDigestsFromFile(context.TODO(), fh, cfg.DigestAlgorithms) if err != nil { return nil, fmt.Errorf("unable to calculate digests for file=%q: %w", cfg.Path, err) } diff --git a/test/cli/scan_cmd_test.go b/test/cli/scan_cmd_test.go index 0bb0da597..d76121504 100644 --- a/test/cli/scan_cmd_test.go +++ b/test/cli/scan_cmd_test.go @@ -354,7 +354,17 @@ func TestPackagesCmdFlags(t *testing.T) { args: []string{"scan", "-vvv", "-o", "json", coverageImage}, assertions: []traitAssertion{ // the application config in the log matches that of what we expect to have been configured. - assertInOutput(`parallelism: 1`), + assertInOutput(`parallelism: 0`), + assertPackageCount(coverageImageSquashedPackageCount), + assertSuccessfulReturnCode, + }, + }, + { + name: "parallelism-flag", + args: []string{"scan", "-vvv", "--parallelism", "2", "-o", "json", coverageImage}, + assertions: []traitAssertion{ + // the application config in the log matches that of what we expect to have been configured. + assertInOutput(`parallelism: 2`), assertPackageCount(coverageImageSquashedPackageCount), assertSuccessfulReturnCode, },