[wip] more concurrent catalogers

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>
This commit is contained in:
Alex Goodman 2024-10-01 10:18:44 -04:00
parent 7815d8e4d9
commit edd910f88f
8 changed files with 140 additions and 47 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/wagoodman/go-progress" "github.com/wagoodman/go-progress"
"github.com/anchore/clio" "github.com/anchore/clio"
"github.com/anchore/go-sync"
"github.com/anchore/stereoscope" "github.com/anchore/stereoscope"
"github.com/anchore/syft/cmd/syft/internal/options" "github.com/anchore/syft/cmd/syft/internal/options"
"github.com/anchore/syft/cmd/syft/internal/ui" "github.com/anchore/syft/cmd/syft/internal/ui"
@ -252,6 +253,8 @@ func generateSBOMForAttestation(ctx context.Context, id clio.Identification, opt
return nil, fmt.Errorf("attest requires use of an OCI registry directly, one or more of the specified sources is unsupported: %v", opts.From) return nil, fmt.Errorf("attest requires use of an OCI registry directly, one or more of the specified sources is unsupported: %v", opts.From)
} }
ctx = sync.SetContextExecutor(ctx, sync.NewExecutor(opts.Parallelism))
src, err := getSource(ctx, opts, userInput, stereoscope.RegistryTag) src, err := getSource(ctx, opts, userInput, stereoscope.RegistryTag)
if err != nil { if err != nil {

View File

@ -14,6 +14,7 @@ import (
"github.com/anchore/clio" "github.com/anchore/clio"
"github.com/anchore/go-collections" "github.com/anchore/go-collections"
"github.com/anchore/go-sync"
"github.com/anchore/stereoscope" "github.com/anchore/stereoscope"
"github.com/anchore/stereoscope/pkg/image" "github.com/anchore/stereoscope/pkg/image"
"github.com/anchore/syft/cmd/syft/internal/options" "github.com/anchore/syft/cmd/syft/internal/options"
@ -167,6 +168,8 @@ func validateArgs(cmd *cobra.Command, args []string, error string) error {
} }
func runScan(ctx context.Context, id clio.Identification, opts *scanOptions, userInput string) error { func runScan(ctx context.Context, id clio.Identification, opts *scanOptions, userInput string) error {
ctx = sync.SetContextExecutor(ctx, sync.NewExecutor(opts.Parallelism))
writer, err := opts.SBOMWriter() writer, err := opts.SBOMWriter()
if err != nil { if err != nil {
return err return err

View File

@ -2,6 +2,7 @@ package options
import ( import (
"fmt" "fmt"
"runtime"
"sort" "sort"
"strings" "strings"
@ -72,7 +73,7 @@ func DefaultCatalog() Catalog {
File: defaultFileConfig(), File: defaultFileConfig(),
Relationships: defaultRelationshipsConfig(), Relationships: defaultRelationshipsConfig(),
Source: defaultSourceConfig(), Source: defaultSourceConfig(),
Parallelism: 1, Parallelism: runtime.NumCPU() * 3,
} }
} }

1
go.mod
View File

@ -90,6 +90,7 @@ require (
github.com/BurntSushi/toml v1.4.0 github.com/BurntSushi/toml v1.4.0
github.com/OneOfOne/xxhash v1.2.8 github.com/OneOfOne/xxhash v1.2.8
github.com/adrg/xdg v0.5.0 github.com/adrg/xdg v0.5.0
github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9
github.com/magiconair/properties v1.8.7 github.com/magiconair/properties v1.8.7
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 golang.org/x/exp v0.0.0-20231108232855-2478ac86f678
) )

2
go.sum
View File

@ -109,6 +109,8 @@ github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb h1:iDMnx6LIj
github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb/go.mod h1:DmTY2Mfcv38hsHbG78xMiTDdxFtkHpgYNVDPsF2TgHk= github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb/go.mod h1:DmTY2Mfcv38hsHbG78xMiTDdxFtkHpgYNVDPsF2TgHk=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 h1:aM1rlcoLz8y5B2r4tTLMiVTrMtpfY0O8EScKJxaSaEc= github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 h1:aM1rlcoLz8y5B2r4tTLMiVTrMtpfY0O8EScKJxaSaEc=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092/go.mod h1:rYqSE9HbjzpHTI74vwPvae4ZVYZd1lue2ta6xHPdblA= github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092/go.mod h1:rYqSE9HbjzpHTI74vwPvae4ZVYZd1lue2ta6xHPdblA=
github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9 h1:CEONkqYICLuKgiIgHIcY9cxSDvQWMtDnIY01HSVCJhc=
github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9/go.mod h1:43zWHVYBx8GXzjjISSD4rMpACGBpUZmE3Yy+qUNnhn4=
github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 h1:VzprUTpc0vW0nnNKJfJieyH/TZ9UYAnTZs5/gHTdAe8= github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 h1:VzprUTpc0vW0nnNKJfJieyH/TZ9UYAnTZs5/gHTdAe8=
github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04/go.mod h1:6dK64g27Qi1qGQZ67gFmBFvEHScy0/C8qhQhNe5B5pQ= github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04/go.mod h1:6dK64g27Qi1qGQZ67gFmBFvEHScy0/C8qhQhNe5B5pQ=
github.com/anchore/go-version v1.2.2-0.20200701162849-18adb9c92b9b h1:e1bmaoJfZVsCYMrIZBpFxwV26CbsuoEh5muXD5I1Ods= github.com/anchore/go-version v1.2.2-0.20200701162849-18adb9c92b9b h1:e1bmaoJfZVsCYMrIZBpFxwV26CbsuoEh5muXD5I1Ods=

View File

@ -4,11 +4,11 @@ import (
"context" "context"
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"sync"
"time" "time"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/anchore/go-sync"
"github.com/anchore/syft/internal/log" "github.com/anchore/syft/internal/log"
"github.com/anchore/syft/internal/sbomsync" "github.com/anchore/syft/internal/sbomsync"
"github.com/anchore/syft/syft/event/monitor" "github.com/anchore/syft/syft/event/monitor"
@ -35,31 +35,48 @@ func NewTaskExecutor(tasks []Task, numWorkers int) *Executor {
} }
func (p *Executor) Execute(ctx context.Context, resolver file.Resolver, s sbomsync.Builder, prog *monitor.CatalogerTaskProgress) error { func (p *Executor) Execute(ctx context.Context, resolver file.Resolver, s sbomsync.Builder, prog *monitor.CatalogerTaskProgress) error {
var errs error exec := sync.ContextExecutor(ctx)
wg := &sync.WaitGroup{}
for i := 0; i < p.numWorkers; i++ { collector := sync.NewCollector[error](exec)
wg.Add(1)
go func() { run := func(tsk Task) sync.ProviderFunc[error] {
defer wg.Done() return func() error {
if err := runTaskSafely(ctx, tsk, resolver, s); err != nil {
prog.SetError(err)
return err
}
prog.Increment()
return nil
}
}
for { for {
tsk, ok := <-p.tasks tsk, ok := <-p.tasks
if !ok { if !ok {
return break
} }
if err := runTaskSafely(ctx, tsk, resolver, s); err != nil { collector.Provide(run(tsk))
errs = multierror.Append(errs, fmt.Errorf("failed to run task: %w", err))
prog.SetError(err)
}
prog.Increment()
}
}()
} }
wg.Wait() errs := collector.Collect()
return errs if len(errs) == 0 {
return nil
}
var nonNilErrs []error
for _, err := range errs {
if err != nil {
nonNilErrs = append(nonNilErrs, err)
}
}
if len(nonNilErrs) == 0 {
return nil
}
return multierror.Append(nil, nonNilErrs...)
} }
func runTaskSafely(ctx context.Context, t Task, resolver file.Resolver, s sbomsync.Builder) (err error) { func runTaskSafely(ctx context.Context, t Task, resolver file.Resolver, s sbomsync.Builder) (err error) {

View File

@ -8,6 +8,7 @@ import (
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/anchore/go-sync"
stereoscopeFile "github.com/anchore/stereoscope/pkg/file" stereoscopeFile "github.com/anchore/stereoscope/pkg/file"
"github.com/anchore/syft/internal" "github.com/anchore/syft/internal"
"github.com/anchore/syft/internal/bus" "github.com/anchore/syft/internal/bus"
@ -30,6 +31,12 @@ func NewCataloger(hashes []crypto.Hash) *Cataloger {
} }
} }
type result struct {
coordinates file.Coordinates
digests []file.Digest
err error
}
func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordinates ...file.Coordinates) (map[file.Coordinates][]file.Digest, error) { func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordinates ...file.Coordinates) (map[file.Coordinates][]file.Digest, error) {
results := make(map[file.Coordinates][]file.Digest) results := make(map[file.Coordinates][]file.Digest)
var locations []file.Location var locations []file.Location
@ -46,29 +53,22 @@ func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordin
} }
} }
exec := sync.ContextExecutor(ctx)
collector := sync.NewCollector[result](exec)
prog := catalogingProgress(int64(len(locations))) prog := catalogingProgress(int64(len(locations)))
for _, location := range locations { for _, location := range locations {
result, err := i.catalogLocation(resolver, location) collector.Provide(i.run(resolver, location, prog))
}
if errors.Is(err, ErrUndigestableFile) { for _, r := range collector.Collect() {
if r.err != nil {
log.Warnf("failed to process file %q: %+v", r.coordinates.RealPath, r.err)
continue continue
} }
prog.AtomicStage.Set(location.Path()) results[r.coordinates] = append(results[r.coordinates], r.digests...)
if internal.IsErrPathPermission(err) {
log.Debugf("file digests cataloger skipping %q: %+v", location.RealPath, err)
continue
}
if err != nil {
prog.SetError(err)
return nil, fmt.Errorf("failed to process file %q: %w", location.RealPath, err)
}
prog.Increment()
results[location.Coordinates] = result
} }
log.Debugf("file digests cataloger processed %d files", prog.Current()) log.Debugf("file digests cataloger processed %d files", prog.Current())
@ -79,6 +79,42 @@ func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordin
return results, nil return results, nil
} }
func (i *Cataloger) run(resolver file.Resolver, location file.Location, prog *monitor.CatalogerTaskProgress) sync.ProviderFunc[result] {
return func() result {
digests, err := i.catalogLocation(resolver, location)
if errors.Is(err, ErrUndigestableFile) {
return result{
coordinates: location.Coordinates,
}
}
prog.AtomicStage.Set(location.Path())
if internal.IsErrPathPermission(err) {
log.Debugf("file digests cataloger skipping %q: %+v", location.RealPath, err)
return result{
coordinates: location.Coordinates,
}
}
if err != nil {
prog.SetError(err)
return result{
coordinates: location.Coordinates,
err: fmt.Errorf("failed to process file %q: %w", location.RealPath, err),
}
}
prog.Increment()
return result{
coordinates: location.Coordinates,
digests: digests,
err: err,
}
}
}
func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Location) ([]file.Digest, error) { func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Location) ([]file.Digest, error) {
meta, err := resolver.FileMetadataByLocation(location) meta, err := resolver.FileMetadataByLocation(location)
if err != nil { if err != nil {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/anchore/go-logger" "github.com/anchore/go-logger"
"github.com/anchore/go-sync"
"github.com/anchore/syft/internal" "github.com/anchore/syft/internal"
"github.com/anchore/syft/internal/log" "github.com/anchore/syft/internal/log"
"github.com/anchore/syft/syft/artifact" "github.com/anchore/syft/syft/artifact"
@ -147,36 +148,65 @@ func (c *Cataloger) Name() string {
return c.upstreamCataloger return c.upstreamCataloger
} }
type parserResult struct {
packages []pkg.Package
relationships []artifact.Relationship
err error
}
// Catalog is given an object to resolve file references and content, this function returns any discovered Packages after analyzing the catalog source. // Catalog is given an object to resolve file references and content, this function returns any discovered Packages after analyzing the catalog source.
func (c *Cataloger) Catalog(ctx context.Context, resolver file.Resolver) ([]pkg.Package, []artifact.Relationship, error) { func (c *Cataloger) Catalog(ctx context.Context, resolver file.Resolver) ([]pkg.Package, []artifact.Relationship, error) {
var packages []pkg.Package var packages []pkg.Package
var relationships []artifact.Relationship var relationships []artifact.Relationship
logger := log.Nested("cataloger", c.upstreamCataloger)
env := Environment{ env := Environment{
// TODO: consider passing into the cataloger, this would affect the cataloger interface (and all implementations). This can be deferred until later. // TODO: consider passing into the cataloger, this would affect the cataloger interface (and all implementations). This can be deferred until later.
LinuxRelease: linux.IdentifyRelease(resolver), LinuxRelease: linux.IdentifyRelease(resolver),
} }
exec := sync.ContextExecutor(ctx)
collector := sync.NewCollector[parserResult](exec)
for _, req := range c.selectFiles(resolver) { for _, req := range c.selectFiles(resolver) {
collector.Provide(c.run(ctx, resolver, req, env))
}
results := collector.Collect()
for _, result := range results {
packages = append(packages, result.packages...)
relationships = append(relationships, result.relationships...)
}
return c.process(ctx, resolver, packages, relationships, nil)
}
func (c *Cataloger) run(ctx context.Context, resolver file.Resolver, req request, env Environment) sync.ProviderFunc[parserResult] {
return func() parserResult {
lgr := log.Nested("cataloger", c.upstreamCataloger)
location, parser := req.Location, req.Parser location, parser := req.Location, req.Parser
log.WithFields("path", location.RealPath).Trace("parsing file contents") log.WithFields("path", location.RealPath).Trace("parsing file contents")
discoveredPackages, discoveredRelationships, err := invokeParser(ctx, resolver, location, logger, parser, &env) discoveredPackages, discoveredRelationships, err := invokeParser(ctx, resolver, location, lgr, parser, &env)
if err != nil { if err != nil {
continue // logging is handled within invokeParser // note: logging is handled within invokeParser
return parserResult{
err: err,
}
} }
for _, p := range discoveredPackages { for i := range discoveredPackages {
p.FoundBy = c.upstreamCataloger discoveredPackages[i].FoundBy = c.upstreamCataloger
packages = append(packages, p)
} }
relationships = append(relationships, discoveredRelationships...) return parserResult{
packages: discoveredPackages,
relationships: discoveredRelationships,
}
} }
return c.process(ctx, resolver, packages, relationships, nil)
} }
func (c *Cataloger) process(ctx context.Context, resolver file.Resolver, pkgs []pkg.Package, rels []artifact.Relationship, err error) ([]pkg.Package, []artifact.Relationship, error) { func (c *Cataloger) process(ctx context.Context, resolver file.Resolver, pkgs []pkg.Package, rels []artifact.Relationship, err error) ([]pkg.Package, []artifact.Relationship, error) {