From 33e0227d1ccf4d3cbd0d4abee74783c73c0f6070 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Fri, 4 Jan 2019 11:06:06 +0000 Subject: [PATCH] [Auditbeat] Report process errors (#9693) (#9845) Changes the process metricset to keep iterating through processes even when an unexpected error occurs. The error will be stored in the Process object and sent to Elasticsearch as well as logged as a warning. This only happens the first time the error is encountered for a process, not on subsequent collection cycles. (cherry picked from commit 2cd7c4283fd50ba9ea6753e94dcc8272c0804934) --- .../module/system/process/process.go | 194 +++++++++++------- 1 file changed, 123 insertions(+), 71 deletions(-) diff --git a/x-pack/auditbeat/module/system/process/process.go b/x-pack/auditbeat/module/system/process/process.go index 7bcf1507c7e..be83e597403 100644 --- a/x-pack/auditbeat/module/system/process/process.go +++ b/x-pack/auditbeat/module/system/process/process.go @@ -36,6 +36,7 @@ const ( eventTypeState = "state" eventTypeEvent = "event" + eventTypeError = "error" ) type eventAction uint8 @@ -44,6 +45,7 @@ const ( eventActionExistingProcess eventAction = iota eventActionProcessStarted eventActionProcessStopped + eventActionProcessError ) func (action eventAction) String() string { @@ -54,6 +56,8 @@ func (action eventAction) String() string { return "process_started" case eventActionProcessStopped: return "process_stopped" + case eventActionProcessError: + return "process_error" default: return "" } @@ -78,29 +82,30 @@ type MetricSet struct { suppressPermissionWarnings bool } -// ProcessInfo wraps the process information and implements cache.Cacheable. -type ProcessInfo struct { - types.ProcessInfo +// Process represents information about a process. +type Process struct { + Info types.ProcessInfo + Error error } -// Hash creates a hash for ProcessInfo. -func (pInfo ProcessInfo) Hash() uint64 { +// Hash creates a hash for Process. +func (p Process) Hash() uint64 { h := xxhash.New64() - h.WriteString(strconv.Itoa(pInfo.PID)) - h.WriteString(pInfo.StartTime.String()) + h.WriteString(strconv.Itoa(p.Info.PID)) + h.WriteString(p.Info.StartTime.String()) return h.Sum64() } -func (pInfo ProcessInfo) toMapStr() common.MapStr { +func (p Process) toMapStr() common.MapStr { return common.MapStr{ // https://github.com/elastic/ecs#-process-fields - "name": pInfo.Name, - "args": pInfo.Args, - "pid": pInfo.PID, - "ppid": pInfo.PPID, - "working_directory": pInfo.CWD, - "executable": pInfo.Exe, - "start": pInfo.StartTime, + "name": p.Info.Name, + "args": p.Info.Args, + "pid": p.Info.PID, + "ppid": p.Info.PPID, + "working_directory": p.Info.CWD, + "executable": p.Info.Exe, + "start": p.Info.StartTime, } } @@ -142,6 +147,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { ms.log.Debug("No state timestamp found") } + if os.Geteuid() != 0 { + ms.log.Warn("Running as non-root user, will likely not report all processes.") + } + return ms, nil } @@ -181,25 +190,30 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { ms.lastState = time.Now() } - processInfos, err := ms.getProcessInfos() + processes, err := ms.getProcesses() if err != nil { return errors.Wrap(err, "failed to get process infos") } - ms.log.Debugf("Found %v processes", len(processInfos)) + ms.log.Debugf("Found %v processes", len(processes)) stateID, err := uuid.NewV4() if err != nil { return errors.Wrap(err, "error generating state ID") } - for _, pInfo := range processInfos { - event := processEvent(pInfo, eventTypeState, eventActionExistingProcess) - event.RootFields.Put("event.id", stateID.String()) - report.Event(event) + for _, p := range processes { + if p.Error == nil { + event := processEvent(p, eventTypeState, eventActionExistingProcess) + event.RootFields.Put("event.id", stateID.String()) + report.Event(event) + } else { + ms.log.Warn(p.Error) + report.Event(processEvent(p, eventTypeError, eventActionProcessError)) + } } if ms.cache != nil { // This will initialize the cache with the current processes - ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos)) + ms.cache.DiffAndUpdateCache(convertToCacheable(processes)) } // Save time so we know when to send the state again (config.StatePeriod) @@ -217,39 +231,60 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { // reportChanges detects and reports any changes to processes on this system since the last call. func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { - processInfos, err := ms.getProcessInfos() + processes, err := ms.getProcesses() if err != nil { - return errors.Wrap(err, "failed to get process infos") + return errors.Wrap(err, "failed to get processes") } - ms.log.Debugf("Found %v processes", len(processInfos)) + ms.log.Debugf("Found %v processes", len(processes)) - started, stopped := ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos)) + started, stopped := ms.cache.DiffAndUpdateCache(convertToCacheable(processes)) - for _, pInfo := range started { - report.Event(processEvent(pInfo.(*ProcessInfo), eventTypeEvent, eventActionProcessStarted)) + for _, cacheValue := range started { + p := cacheValue.(*Process) + + if p.Error == nil { + report.Event(processEvent(p, eventTypeEvent, eventActionProcessStarted)) + } else { + ms.log.Warn(p.Error) + report.Event(processEvent(p, eventTypeError, eventActionProcessError)) + } } - for _, pInfo := range stopped { - report.Event(processEvent(pInfo.(*ProcessInfo), eventTypeEvent, eventActionProcessStopped)) + for _, cacheValue := range stopped { + p := cacheValue.(*Process) + + if p.Error == nil { + report.Event(processEvent(p, eventTypeEvent, eventActionProcessStopped)) + } } return nil } -func processEvent(pInfo *ProcessInfo, eventType string, action eventAction) mb.Event { - return mb.Event{ +func processEvent(process *Process, eventType string, action eventAction) mb.Event { + event := mb.Event{ RootFields: common.MapStr{ "event": common.MapStr{ "kind": eventType, "action": action.String(), }, - "process": pInfo.toMapStr(), - "message": processMessage(pInfo, action), + "process": process.toMapStr(), + "message": processMessage(process, action), }, } + + if process.Error != nil { + event.RootFields.Put("error.message", process.Error.Error()) + } + + return event } -func processMessage(pInfo *ProcessInfo, action eventAction) string { +func processMessage(process *Process, action eventAction) string { + if process.Error != nil { + return fmt.Sprintf("ERROR for PID %d: %v", process.Info.PID, process.Error) + } + var actionString string switch action { case eventActionProcessStarted: @@ -261,20 +296,20 @@ func processMessage(pInfo *ProcessInfo, action eventAction) string { } return fmt.Sprintf("Process %v (PID: %d) %v", - pInfo.Name, pInfo.PID, actionString) + process.Info.Name, process.Info.PID, actionString) } -func convertToCacheable(processInfos []*ProcessInfo) []cache.Cacheable { - c := make([]cache.Cacheable, 0, len(processInfos)) +func convertToCacheable(processes []*Process) []cache.Cacheable { + c := make([]cache.Cacheable, 0, len(processes)) - for _, p := range processInfos { + for _, p := range processes { c = append(c, p) } return c } -func (ms *MetricSet) getProcessInfos() ([]*ProcessInfo, error) { +func (ms *MetricSet) getProcesses() ([]*Process, error) { // TODO: Implement Processes() in go-sysinfo // e.g. https://github.com/elastic/go-sysinfo/blob/master/providers/darwin/process_darwin_amd64.go#L41 pids, err := process.Pids() @@ -282,56 +317,73 @@ func (ms *MetricSet) getProcessInfos() ([]*ProcessInfo, error) { return nil, errors.Wrap(err, "failed to fetch the list of PIDs") } - var processInfos []*ProcessInfo - + var processes []*Process for _, pid := range pids { - process, err := sysinfo.Process(pid) + var process *Process + + sysinfoProc, err := sysinfo.Process(pid) if err != nil { if os.IsNotExist(err) { // Skip - process probably just terminated since our call // to Pids() continue } - return nil, errors.Wrap(err, "failed to load process") - } - pInfo, err := process.Info() - if err != nil { - if os.IsNotExist(err) { - // Skip - process probably just terminated since our call - // to Pids() - continue + // Record what we can and continue + process = &Process{ + Info: types.ProcessInfo{ + PID: pid, + }, + Error: errors.Wrapf(err, "failed to load process with PID %d", pid), } + } else { + pInfo, err := sysinfoProc.Info() + if err == nil { + process = &Process{ + Info: pInfo, + } + } else { + if os.IsNotExist(err) { + // Skip - process probably just terminated since our call + // to Pids() + continue + } + + if os.Geteuid() != 0 { + if os.IsPermission(err) || runtime.GOOS == "darwin" { + /* + Running as non-root, permission issues when trying to access other user's private + process information are expected. - if os.Geteuid() != 0 { - if os.IsPermission(err) || runtime.GOOS == "darwin" { - /* - Running as non-root, permission issues when trying to access other user's private - process information are expected. + Unfortunately, for darwin os.IsPermission() does not + work because it is a custom error created using errors.New() in + getProcTaskAllInfo() in go-sysinfo/providers/darwin/process_darwin_amd64.go - Unfortunately, for darwin os.IsPermission() does not - work because it is a custom error created using errors.New() in - getProcTaskAllInfo() in go-sysinfo/providers/darwin/process_darwin_amd64.go + TODO: Fix go-sysinfo to have better error for darwin. + */ + if !ms.suppressPermissionWarnings { + ms.log.Warnf("Failed to load process information for PID %d as non-root user. "+ + "Will suppress further errors of this kind. Error: %v", pid, err) - TODO: Fix go-sysinfo to have better error for darwin. - */ - if !ms.suppressPermissionWarnings { - ms.log.Warnf("Failed to load process information for PID %d as non-root user. "+ - "Will suppress further errors of this kind. Error: %v", pid, err) + // Only warn once at the start of Auditbeat. + ms.suppressPermissionWarnings = true + } - // Only warn once at the start of Auditbeat. - ms.suppressPermissionWarnings = true + //continue } + } - continue + // Record what we can and continue + process = &Process{ + Info: pInfo, + Error: errors.Wrapf(err, "failed to load process information for PID %d", pid), } + process.Info.PID = pid // in case pInfo did not contain it } - - return nil, errors.Wrap(err, "failed to load process information") } - processInfos = append(processInfos, &ProcessInfo{pInfo}) + processes = append(processes, process) } - return processInfos, nil + return processes, nil }