syft/syft/pkg/cataloger/catalog.go
Alex Goodman 4adfbeb5f0
Generalize UI events for cataloging tasks (#2369)
* generalize ui events for cataloging tasks

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* moderate review comments

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* incorporate review comments

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* rename cataloger task progress object

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* migrate cataloger task fn to bus helper

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

---------

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>
2023-11-30 16:25:50 +00:00

230 lines
7.2 KiB
Go

package cataloger
import (
"fmt"
"math"
"runtime/debug"
"sync"
"github.com/dustin/go-humanize"
"github.com/hashicorp/go-multierror"
"github.com/wagoodman/go-progress"
"github.com/anchore/syft/internal/bus"
"github.com/anchore/syft/internal/log"
"github.com/anchore/syft/syft/artifact"
"github.com/anchore/syft/syft/event/monitor"
"github.com/anchore/syft/syft/file"
"github.com/anchore/syft/syft/linux"
"github.com/anchore/syft/syft/pkg"
"github.com/anchore/syft/syft/pkg/cataloger/common/cpe"
)
// Monitor provides progress-related data for observing the progress of a Catalog() call (published on the event bus).
type Monitor struct {
FilesProcessed progress.Monitorable // the number of files selected and contents analyzed from all registered catalogers
PackagesDiscovered progress.Monitorable // the number of packages discovered from all registered catalogers
}
// catalogResult provides the result of running a single cataloger against source
type catalogResult struct {
Packages []pkg.Package
Relationships []artifact.Relationship
// Discovered may sometimes be more than len(packages)
Discovered int64
Error error
}
func runCataloger(cataloger pkg.Cataloger, resolver file.Resolver) (catalogerResult *catalogResult, err error) {
// handle individual cataloger panics
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("%v at:\n%s", e, string(debug.Stack()))
}
}()
catalogerResult = new(catalogResult)
// find packages from the underlying raw data
log.WithFields("cataloger", cataloger.Name()).Trace("cataloging started")
packages, relationships, err := cataloger.Catalog(resolver)
if err != nil {
log.WithFields("cataloger", cataloger.Name()).Warn("error while cataloging")
return catalogerResult, err
}
catalogedPackages := len(packages)
log.WithFields("cataloger", cataloger.Name()).Debugf("discovered %d packages", catalogedPackages)
catalogerResult.Discovered = int64(catalogedPackages)
for _, p := range packages {
// generate CPEs (note: this is excluded from package ID, so is safe to mutate)
// we might have binary classified CPE already with the package so we want to append here
dictionaryCPE, ok := cpe.DictionaryFind(p)
if ok {
log.Debugf("used CPE dictionary to find CPE for %s package %q: %s", p.Type, p.Name, dictionaryCPE.BindToFmtString())
p.CPEs = append(p.CPEs, dictionaryCPE)
} else {
p.CPEs = append(p.CPEs, cpe.Generate(p)...)
}
// if we were not able to identify the language we have an opportunity
// to try and get this value from the PURL. Worst case we assert that
// we could not identify the language at either stage and set UnknownLanguage
if p.Language == "" {
p.Language = pkg.LanguageFromPURL(p.PURL)
}
// create file-to-package relationships for files owned by the package
owningRelationships, err := packageFileOwnershipRelationships(p, resolver)
if err != nil {
log.WithFields("cataloger", cataloger.Name(), "package", p.Name, "error", err).Warnf("unable to create any package-file relationships")
} else {
catalogerResult.Relationships = append(catalogerResult.Relationships, owningRelationships...)
}
catalogerResult.Packages = append(catalogerResult.Packages, p)
}
catalogerResult.Relationships = append(catalogerResult.Relationships, relationships...)
log.WithFields("cataloger", cataloger.Name()).Trace("cataloging complete")
return catalogerResult, err
}
// Catalog a given source (container image or filesystem) with the given catalogers, returning all discovered packages.
// In order to efficiently retrieve contents from a underlying container image the content fetch requests are
// done in bulk. Specifically, all files of interest are collected from each catalogers and accumulated into a single
// request.
//
//nolint:funlen
func Catalog(resolver file.Resolver, _ *linux.Release, parallelism int, catalogers ...pkg.Cataloger) (*pkg.Collection, []artifact.Relationship, error) {
catalog := pkg.NewCollection()
var allRelationships []artifact.Relationship
prog := monitorPackageCatalogingTask()
// perform analysis, accumulating errors for each failed analysis
var errs error
nCatalogers := len(catalogers)
// we do not need more parallelism than there are `catalogers`.
parallelism = int(math.Min(float64(nCatalogers), math.Max(1.0, float64(parallelism))))
log.WithFields("parallelism", parallelism, "catalogers", nCatalogers).Debug("cataloging packages")
jobs := make(chan pkg.Cataloger, nCatalogers)
results := make(chan *catalogResult, nCatalogers)
waitGroup := sync.WaitGroup{}
var totalPackagesDiscovered int64
for i := 0; i < parallelism; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
// wait for / get the next cataloger job available.
for cataloger := range jobs {
result, err := runCataloger(cataloger, resolver)
// ensure we set the error to be aggregated
result.Error = err
prog.Add(result.Discovered)
totalPackagesDiscovered += result.Discovered
count := humanize.Comma(totalPackagesDiscovered)
prog.AtomicStage.Set(fmt.Sprintf("%s packages", count))
results <- result
}
}()
}
// Enqueue the jobs
for _, cataloger := range catalogers {
jobs <- cataloger
}
close(jobs)
// Wait for the jobs to finish
waitGroup.Wait()
close(results)
// collect the results
for result := range results {
if result.Error != nil {
errs = multierror.Append(errs, result.Error)
}
for _, p := range result.Packages {
catalog.Add(p)
}
allRelationships = append(allRelationships, result.Relationships...)
}
allRelationships = append(allRelationships, pkg.NewRelationships(catalog)...)
if errs != nil {
prog.SetError(errs)
} else {
prog.SetCompleted()
}
return catalog, allRelationships, errs
}
func packageFileOwnershipRelationships(p pkg.Package, resolver file.PathResolver) ([]artifact.Relationship, error) {
fileOwner, ok := p.Metadata.(pkg.FileOwner)
if !ok {
return nil, nil
}
locations := map[artifact.ID]file.Location{}
for _, path := range fileOwner.OwnedFiles() {
pathRefs, err := resolver.FilesByPath(path)
if err != nil {
return nil, fmt.Errorf("unable to find path for path=%q: %w", path, err)
}
if len(pathRefs) == 0 {
// ideally we want to warn users about missing files from a package, however, it is very common for
// container image authors to delete files that are not needed in order to keep image sizes small. Adding
// a warning here would be needlessly noisy (even for popular base images).
continue
}
for _, ref := range pathRefs {
if oldRef, ok := locations[ref.Coordinates.ID()]; ok {
log.Debugf("found path duplicate of %s", oldRef.RealPath)
}
locations[ref.Coordinates.ID()] = ref
}
}
var relationships []artifact.Relationship
for _, location := range locations {
relationships = append(relationships, artifact.Relationship{
From: p,
To: location.Coordinates,
Type: artifact.ContainsRelationship,
})
}
return relationships, nil
}
func monitorPackageCatalogingTask() *monitor.CatalogerTaskProgress {
info := monitor.GenericTask{
Title: monitor.Title{
Default: "Catalog packages",
WhileRunning: "Cataloging packages",
OnSuccess: "Cataloged packages",
},
HideOnSuccess: false,
}
return bus.StartCatalogerTask(info, -1, "")
}