Skip to content

Commit

Permalink
Cherry-pick #28868 to 8.0: Enhance add_kubernetes_metadata matcher (#…
Browse files Browse the repository at this point in the history
…30527)

* Enhance add_kubernetes_metadata matcher (#28868)

* add documentation for add_kubernetes_metadata matcher; support 'var/log/pods' for resource_type: pod

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>

* Apply suggestions from code review

Co-authored-by: Chris Mark <chrismarkou92@gmail.com>

* add record to CHANGELOG.next.asciidoc

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>

* address comments: log pod id instead of array; log event

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>

* add validation for logs_path matchers config

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>

* add comment for the config validation check

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>

* set different sourcePath for windows in tests

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>

Co-authored-by: Chris Mark <chrismarkou92@gmail.com>
(cherry picked from commit ac8275f)

* Update CHANGELOG.next.asciidoc
  • Loading branch information
tetianakravchenko committed Feb 23, 2022
1 parent 3adc10d commit d94627b
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 41 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*

- Add support for '/var/log/pods/' path for add_kubernetes_metadata processor with `resource_type: pod`. {pull}28868[28868]
- Add documentation for add_kubernetes_metadata processors `log_path` matcher. {pull}28868[28868]

*Heartbeat*

Expand Down
100 changes: 63 additions & 37 deletions filebeat/processor/add_kubernetes_metadata/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,59 +78,78 @@ func newLogsPathMatcher(cfg common.Config) (add_kubernetes_metadata.Matcher, err
// Docker container ID is a 64-character-long hexadecimal string
const containerIdLen = 64

// Pod UID is on the 5th index of the path directories
const podUIDPos = 5

func (f *LogPathMatcher) MetadataIndex(event common.MapStr) string {
value, err := event.GetValue("log.file.path")
if err == nil {
source := value.(string)
f.logger.Debugf("Incoming log.file.path value: %s", source)
if err != nil {
f.logger.Debugf("Error extracting log.file.path from the event: %s.", event)
return ""
}

if !strings.Contains(source, f.LogsPath) {
f.logger.Errorf("Error extracting container id - source value does not contain matcher's logs_path '%s'.", f.LogsPath)
return ""
}
source := value.(string)
f.logger.Debugf("Incoming log.file.path value: %s", source)

if !strings.Contains(source, f.LogsPath) {
f.logger.Errorf("Error extracting container id - source value does not contain matcher's logs_path '%s'.", f.LogsPath)
return ""
}

sourceLen := len(source)
logsPathLen := len(f.LogsPath)
sourceLen := len(source)
logsPathLen := len(f.LogsPath)

if f.ResourceType == "pod" {
// Specify a pod resource type when manually mounting log volumes and they end up under "/var/lib/kubelet/pods/"
// This will extract only the pod UID, which offers less granularity of metadata when compared to the container ID
if strings.HasPrefix(f.LogsPath, podLogsPath()) && strings.HasSuffix(source, ".log") {
if f.ResourceType == "pod" {
// Pod resource type will extract only the pod UID, which offers less granularity of metadata when compared to the container ID
if strings.HasSuffix(source, ".log") {
// Specify a pod resource type when writting logs into manually mounted log volume,
// those logs apper under under "/var/lib/kubelet/pods/<pod_id>/volumes/..."
if strings.HasPrefix(f.LogsPath, podKubeletLogsPath()) {
pathDirs := strings.Split(source, pathSeparator)
podUIDPos := 5
if len(pathDirs) > podUIDPos {
podUID := strings.Split(source, pathSeparator)[podUIDPos]

f.logger.Debugf("Using pod uid: %s", podUID)
return podUID
}

f.logger.Error("Error extracting pod uid - source value contains matcher's logs_path, however it is too short to contain a Pod UID.")
}
} else {
// In case of the Kubernetes log path "/var/log/containers/",
// the container ID will be located right before the ".log" extension.
if strings.HasPrefix(f.LogsPath, containerLogsPath()) && strings.HasSuffix(source, ".log") && sourceLen >= containerIdLen+4 {
containerIDEnd := sourceLen - 4
cid := source[containerIDEnd-containerIdLen : containerIDEnd]
f.logger.Debugf("Using container id: %s", cid)
return cid
}

// In any other case, we assume the container ID will follow right after the log path.
// However we need to check the length to prevent "slice bound out of range" runtime errors.
if sourceLen >= logsPathLen+containerIdLen {
cid := source[logsPathLen : logsPathLen+containerIdLen]
f.logger.Debugf("Using container id: %s", cid)
return cid
// In case of the Kubernetes log path "/var/log/pods/",
// the pod ID will be extracted from the directory name,
// file name example: "/var/log/pods/'<namespace>_<pod_name>_<pod_uid>'/container_name/0.log".
if strings.HasPrefix(f.LogsPath, podLogsPath()) {
pathDirs := strings.Split(source, pathSeparator)
podUIDPos := 4
if len(pathDirs) > podUIDPos {
podUID := strings.Split(pathDirs[podUIDPos], "_")
if len(podUID) > 2 {
f.logger.Debugf("Using pod uid: %s", podUID[2])
return podUID[2]
}
}
}

f.logger.Error("Error extracting container id - source value contains matcher's logs_path, however it is too short to contain a Docker container ID.")
f.logger.Error("Error extracting pod uid - source value does not contains matcher's logs_path")
return ""
}
}
// In case of the Kubernetes log path "/var/log/containers/",
// the container ID will be located right before the ".log" extension.
// file name example: /var/log/containers/<pod_name>_<namespace>_<container_name>-<continer_id>.log
if strings.HasPrefix(f.LogsPath, containerLogsPath()) && strings.HasSuffix(source, ".log") && sourceLen >= containerIdLen+4 {
containerIDEnd := sourceLen - 4
cid := source[containerIDEnd-containerIdLen : containerIDEnd]
f.logger.Debugf("Using container id: %s", cid)
return cid
}

// In any other case, we assume the container ID will follow right after the log path.
// However we need to check the length to prevent "slice bound out of range" runtime errors.
// for the default log path /var/lib/docker/containers/ container ID will follow right after the log path.
// file name example: /var/lib/docker/containers/<container_id>/<container_id>-json.log
if sourceLen >= logsPathLen+containerIdLen {
cid := source[logsPathLen : logsPathLen+containerIdLen]
f.logger.Debugf("Using container id: %s", cid)
return cid
}

f.logger.Error("Error extracting container id - source value contains matcher's logs_path, however it is too short to contain a Docker container ID.")
return ""
}

Expand All @@ -141,13 +160,20 @@ func defaultLogPath() string {
return "/var/lib/docker/containers/"
}

func podLogsPath() string {
func podKubeletLogsPath() string {
if runtime.GOOS == "windows" {
return "C:\\var\\lib\\kubelet\\pods\\"
}
return "/var/lib/kubelet/pods/"
}

func podLogsPath() string {
if runtime.GOOS == "windows" {
return "C:\\var\\log\\pods\\"
}
return "/var/log/pods/"
}

func containerLogsPath() string {
if runtime.GOOS == "windows" {
return "C:\\var\\log\\containers\\"
Expand Down
30 changes: 30 additions & 0 deletions filebeat/processor/add_kubernetes_metadata/matchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,36 @@ func TestLogsPathMatcher_InvalidSource4(t *testing.T) {
executeTestWithResourceType(t, cfgLogsPath, cfgResourceType, source, expectedResult)
}

func TestLogsPathMatcher_InvalidVarLogPodSource(t *testing.T) {
cfgLogsPath := "/var/log/pods/"
cfgResourceType := "pod"
source := fmt.Sprintf("/invalid/dir/namespace_pod-name_%s/container/0.log", puid)
expectedResult := ""
executeTestWithResourceType(t, cfgLogsPath, cfgResourceType, source, expectedResult)
}

func TestLogsPathMatcher_InvalidVarLogPodIDFormat(t *testing.T) {
cfgLogsPath := "/var/log/pods/"
cfgResourceType := "pod"
source := fmt.Sprintf("/var/log/pods/%s/container/0.log", puid)
expectedResult := ""
executeTestWithResourceType(t, cfgLogsPath, cfgResourceType, source, expectedResult)
}

func TestLogsPathMatcher_ValidVarLogPod(t *testing.T) {
cfgLogsPath := "/var/log/pods/"
cfgResourceType := "pod"
sourcePath := "/var/log/pods/namespace_pod-name_%s/container/0.log"

if runtime.GOOS == "windows" {
cfgLogsPath = "C:\\var\\log\\pods\\"
sourcePath = "C:\\var\\log\\pods\\namespace_pod-name_%s\\container\\0.log"
}
source := fmt.Sprintf(sourcePath, puid)
expectedResult := puid
executeTestWithResourceType(t, cfgLogsPath, cfgResourceType, source, expectedResult)
}

func executeTest(t *testing.T, cfgLogsPath string, source string, expectedResult string) {
executeTestWithResourceType(t, cfgLogsPath, "", source, expectedResult)
}
Expand Down
32 changes: 32 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,37 @@ func (k *kubeAnnotatorConfig) Validate() error {
k.Host = ""
}

// Checks below were added to warn the users early on and avoid initialising the processor in case the `logs_path`
// matcher config is not valid: supported paths defined as a `logs_path` configuration setting are strictly defined
// if `resource_type` is set
for _, matcher := range k.Matchers {
if matcherCfg, ok := matcher["logs_path"]; ok {
if matcherCfg.HasField("resource_type") {
logsPathMatcher := struct {
LogsPath string `config:"logs_path"`
ResourceType string `config:"resource_type"`
}{}

err := matcherCfg.Unpack(&logsPathMatcher)
if err != nil {
return fmt.Errorf("fail to unpack the `logs_path` matcher configuration: %s", err)
}
if logsPathMatcher.LogsPath == "" {
return fmt.Errorf("invalid logs_path matcher configuration: when resource_type is defined, logs_path must be set as well")
}
if logsPathMatcher.ResourceType != "pod" && logsPathMatcher.ResourceType != "container" {
return fmt.Errorf("invalid resource_type %s, valid values include `pod`, `container`", logsPathMatcher.ResourceType)
}
if logsPathMatcher.ResourceType == "pod" && !(logsPathMatcher.LogsPath == "/var/lib/kubelet/pods/" || logsPathMatcher.LogsPath == "/var/log/pods/") {
return fmt.Errorf("invalid logs_path defined for resource_type: %s, valid values include `/var/lib/kubelet/pods/`, `/var/log/pods/`", logsPathMatcher.ResourceType)
}
if logsPathMatcher.ResourceType == "container" && logsPathMatcher.LogsPath != "/var/log/containers/" {
return fmt.Errorf("invalid logs_path defined for resource_type: %s, valid value is `/var/log/containers/`", logsPathMatcher.ResourceType)
}
}

}
}

return nil
}
73 changes: 73 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,76 @@ func TestConfigValidate(t *testing.T) {
}
}
}

func TestConfigValidate_LogsPatchMatcher(t *testing.T) {
tests := []struct {
matcherName string
matcherConfig map[string]interface{}
error bool
}{
{
matcherName: "",
matcherConfig: map[string]interface{}{},
error: false,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "pod",
},
error: true,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "pod",
"invalid_field": "invalid_value",
},
error: true,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "pod",
"logs_path": "/var/log/invalid/path/",
},
error: true,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "pod",
"logs_path": "/var/log/pods/",
},
error: false,
},
{
matcherName: "logs_path",
matcherConfig: map[string]interface{}{
"resource_type": "container",
"logs_path": "/var/log/containers/",
},
error: false,
},
}

for _, test := range tests {
cfg, _ := common.NewConfigFrom(test.matcherConfig)

c := defaultKubernetesAnnotatorConfig()
c.DefaultMatchers = Enabled{false}

err := cfg.Unpack(&c)
c.Matchers = PluginConfig{
{
test.matcherName: *cfg,
},
}
err = c.Validate()
if test.error {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,29 @@ the `log.file.path` field.
This matcher has the following configuration settings:

`logs_path`:: (Optional) Base path of container logs. If not specified, it uses
the default logs path of the platform where {beatname_uc} is running.
`resource_type`:: (Optional) Type of the resource to obtain the ID of. It can be
`pod`, to make the lookup based on the pod UID, or `container`, to make the
lookup based on the container ID. It defaults to `container`.
the default logs path of the platform where {beatname_uc} is running: for Linux -
`/var/lib/docker/containers/`, Windows - `C:\\ProgramData\\Docker\\containers`.
To change the default value: container ID must follow right after the `logs_path` -
`<log_path>/<container_id>`, where `container_id` is a 64-character-long
hexadecimal string.

`resource_type`:: (Optional) Type of the resource to obtain the ID of.
Valid `resource_type`:
* `pod`: to make the lookup based on the pod UID. When `resource_type` is set to
`pod`, `logs_path` must be set as well, supported path in this case:
** `/var/lib/kubelet/pods/` used to read logs from mounted into the pod volumes,
those logs end up under `/var/lib/kubelet/pods/<pod UID>/volumes/<volume name>/...`
To use `/var/lib/kubelet/pods/` as a `log_path`, `/var/lib/kubelet/pods` must be
mounted into the filebeat Pods.
** `/var/log/pods/`
Note: when using `resource_type: 'pod'` logs will be enriched only with pod
metadata: pod id, pod name, etc., not container metadata.
*`container`: to make the lookup based on the container ID, `logs_path` must
be set to `/var/log/containers/`.
It defaults to `container`.

To be able to use `logs_path` matcher filebeat input path must be a subdirectory
of directory defined in `logs_path` configuration setting.

The default configuration is able to lookup the metadata using the container ID
when the logs are collected from the default docker logs path
Expand Down

0 comments on commit d94627b

Please sign in to comment.