Skip to content

Commit

Permalink
WIP: regexp match/no match
Browse files Browse the repository at this point in the history
  • Loading branch information
jrockway committed Jun 18, 2022
1 parent b26461f commit 5d35de5
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 104 deletions.
36 changes: 30 additions & 6 deletions cmd/jlog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"os/signal"
"regexp"
"runtime/debug"
"runtime/pprof"
"strings"
Expand Down Expand Up @@ -43,7 +44,9 @@ type output struct {
}

type general struct {
JQ string `short:"e" description:"A jq program to run on each record in the processed input; use this to ignore certain lines, add fields, etc. Hint: 'select(condition)' will remove lines that don't match 'condition'."`
MatchRegex string `short:"g" long:"regex" description:"A regular expression that removes lines from the output that don't match, like grep."`
NoMatchRegex string `short:"G" long:"no-regex" description:"A regular expression that removes lines from the output that DO match, like 'grep -v'."`
JQ string `short:"e" long:"jq" description:"A jq program to run on each record in the processed input; use this to ignore certain lines, add fields, etc. Hint: 'select(condition)' will remove lines that don't match 'condition'."`
NoColor bool `short:"M" long:"no-color" description:"Disable the use of color." env:"JLOG_FORCE_MONOCHROME"`
NoMonochrome bool `short:"c" long:"no-monochrome" description:"Force the use of color." ENV:"JLOG_FORCE_COLOR"`
Profile string `long:"profile" description:"If set, collect a CPU profile and write it to this file."`
Expand All @@ -59,8 +62,8 @@ type input struct {
NoTimestampKey bool `long:"notimekey" description:"If set, don't look for a time, and don't display times." env:"JLOG_NO_TIMESTAMP_KEY"`
MessageKey string `long:"messagekey" description:"JSON key that holds the log message." env:"JLOG_MESSAGE_KEY"`
NoMessageKey bool `long:"nomessagekey" description:"If set, don't look for a message, and don't display messages (time/level + fields only)." env:"JLOG_NO_MESSAGE_KEY"`
DeleteKeys []string `long:"delete" description:"JSON keys to be deleted before JQ processing and output; repeatable." env:"JLOG_DELETE_KEYS"`
UpgradeKeys []string `long:"upgrade" description:"JSON key (of type object) whose fields should be merged with any other fields; good for loggers that always put structed data in a separate key; repeatable.\n--upgrade b would transform as follows: {a:'a', b:{'c':'c'}} -> {a:'a', c:'c'}" env:"JLOG_UPGRADE_KEYS"`
DeleteKeys []string `long:"delete" description:"JSON keys to be deleted before JQ processing and output; repeatable." env:"JLOG_DELETE_KEYS" env-delim:","`
UpgradeKeys []string `long:"upgrade" description:"JSON key (of type object) whose fields should be merged with any other fields; good for loggers that always put structed data in a separate key; repeatable.\n--upgrade b would transform as follows: {a:'a', b:{'c':'c'}} -> {a:'a', c:'c'}" env:"JLOG_UPGRADE_KEYS" env-delim:","`
}

func printVersion(w io.Writer) {
Expand Down Expand Up @@ -164,8 +167,29 @@ func main() {
subsecondFormt = ""
}

jq, err := parse.CompileJQ(gen.JQ)
if err != nil {
fsch := new(parse.FilterScheme)
if gen.MatchRegex != "" && gen.NoMatchRegex != "" {
fmt.Fprintf(os.Stderr, "cannot have both a non-empty MatchRegex and a non-empty NoMatchRegex\n")
os.Exit(1)
}
if rx := gen.MatchRegex; rx != "" {
regex, err := regexp.Compile(rx)
if err != nil {
fmt.Fprintf(os.Stderr, "problem compiling MatchRegex: %v\n", err)
os.Exit(1)
}
fsch.MatchRegex = regex
}
if rx := gen.NoMatchRegex; rx != "" {
regex, err := regexp.Compile(rx)
if err != nil {
fmt.Fprintf(os.Stderr, "problem compiling NoMatchRegex: %v\n", err)
os.Exit(1)
}
fsch.NoMatchRegex = regex
}

if err := fsch.AddJQ(gen.JQ); err != nil {
fmt.Fprintf(os.Stderr, "problem %v\n", err)
os.Exit(1)
}
Expand Down Expand Up @@ -247,7 +271,7 @@ func main() {
signal.Stop(sigCh)
}()

summary, err := parse.ReadLog(os.Stdin, colorable.NewColorableStdout(), ins, outs, jq)
summary, err := parse.ReadLog(os.Stdin, colorable.NewColorableStdout(), ins, outs, fsch)
if err != nil {
if signals := atomic.LoadInt32(&nSignals); signals < 1 || !strings.Contains(err.Error(), "file already closed") {
outs.EmitError(err.Error())
Expand Down
3 changes: 2 additions & 1 deletion integration-tests/loggers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func TestLoggers(t *testing.T) {
PriorityFields: []string{"error", "string", "int", "object", "source"},
Formatter: f,
}
fs := &parse.FilterScheme{}
golden := `
INFO 1 ok line 1
INFO 2 ok line 2 string:value int:42 object:{"foo":"bar"}
Expand All @@ -235,7 +236,7 @@ ERROR 3 ok line 3 error:whoa
output := new(bytes.Buffer)
test.f(input)
inputCopy := *input
if _, err := parse.ReadLog(input, output, ins, outs, nil); err != nil {
if _, err := parse.ReadLog(input, output, ins, outs, fs); err != nil {
t.Fatalf("readlog: %v", err)
}
want := golden
Expand Down
159 changes: 159 additions & 0 deletions pkg/parse/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package parse

import (
"errors"
"fmt"
"regexp"

"github.com/itchyny/gojq"
)

// FilterScheme controls how lines are filtered.
type FilterScheme struct {
JQ *gojq.Code
MatchRegex *regexp.Regexp
NoMatchRegex *regexp.Regexp
}

// DefaultVariables are variables available to JQ programs.
var DefaultVariables = []string{
"$TS",
"$RAW", "$MSG",
"$LVL", "$UNKNOWN", "$TRACE", "$DEBUG", "$INFO", "$WARN", "$ERROR", "$PANIC", "$DPANIC", "$FATAL",
}

// prepareVariable extracts the variables above from a line.
func prepareVariables(l *line) []interface{} {
return []interface{}{
float64(l.time.UnixNano()) / 1e9, // $TS
string(l.raw), l.msg,
uint8(l.lvl), uint8(LevelUnknown), uint8(LevelTrace), uint8(LevelDebug), uint8(LevelInfo), uint8(LevelWarn), uint8(LevelError), uint8(LevelPanic), uint8(LevelDPanic), uint8(LevelFatal),
}
}

// highlightKey is a special key that controls highlighting.
const highlightKey = "__highlight"

func compileJQ(p string) (*gojq.Code, error) {
if p == "" {
return nil, nil
}
p = "def highlight($cond): . + {__highlight: $cond};\n" + p
q, err := gojq.Parse(p)
if err != nil {
return nil, fmt.Errorf("parsing jq program %q: %v", p, err)
}
jq, err := gojq.Compile(q, gojq.WithVariables(DefaultVariables))
if err != nil {
return nil, fmt.Errorf("compiling jq program %q: %v", p, err)
}
return jq, nil
}

// AddJQ compiles the provided jq program and adds it to the filter.
func (f *FilterScheme) AddJQ(p string) error {
if f.JQ != nil {
return errors.New("jq program already added")
}
jq, err := compileJQ(p)
if err != nil {
return err // already has decent annotation
}
f.JQ = jq
return nil
}

// runJQ runs the provided jq program on the provided line. It returns true if the result is empty
// (i.e., the line should be filtered out), and an error if the output type is invalid or another
// error occurred.
func (f *FilterScheme) runJQ(l *line) (bool, error) {
if f.JQ == nil {
return false, nil
}
var filtered bool
iter := f.JQ.Run(l.fields, prepareVariables(l)...)
if result, ok := iter.Next(); ok {
switch x := result.(type) {
case map[string]interface{}:
if raw, ok := x[highlightKey]; ok {
delete(x, highlightKey)
if hi, ok := raw.(bool); ok {
l.highlight = hi
}
}
l.fields = x
case nil:
return false, errors.New("unexpected nil result; yield an empty map ('{}') to delete all fields")
case error:
return false, fmt.Errorf("error: %w", x)
case bool:
return false, errors.New("unexpected boolean output; did you mean to use 'select(...)'?")
default:
return false, fmt.Errorf("unexpected result type %T(%#v)", result, result)
}
if _, ok = iter.Next(); ok {
// We only use the first line that is output. This can be revisited in the
// future.
return false, errors.New("unexpectedly produced more than 1 output")
}
} else {
filtered = true
l.fields = make(map[string]interface{})
}
return filtered, nil
}

// regexpScope determines what fields a regexp should run against. Not implemented yet.
type regexpScope int

const (
regexpScopeUnknown regexpScope = iota
regexpScopeMessage
)

// runRegexp runs the regexp, returning whether or not it matched.
func runRegexp(rx *regexp.Regexp, l *line, scope regexpScope) bool {
var input string
switch scope {
case regexpScopeUnknown:
panic("unknown regexp scope")
case regexpScopeMessage:
input = l.msg
}
fields := rx.FindStringSubmatch(input)
if len(fields) == 0 {
return false
}
for i, name := range rx.SubexpNames() {
if i == 0 {
continue
}
if name == "" {
name = fmt.Sprintf("$%v", i)
}
l.fields[name] = fields[i]
}
return true
}

// Run runs all the filters defined in this FilterScheme against the provided line. The return
// value is true if the line should be removed from the output ("filtered").
func (f *FilterScheme) Run(l *line) (bool, error) {
if rx := f.NoMatchRegex; rx != nil {
found := runRegexp(rx, l, regexpScopeMessage)
if found {
return true, nil
}
}
if rx := f.MatchRegex; rx != nil {
found := runRegexp(rx, l, regexpScopeMessage)
if !found {
return true, nil
}
}
filtered, err := f.runJQ(l)
if err != nil {
return false, fmt.Errorf("jq: %w", err)
}
return filtered, nil
}
92 changes: 9 additions & 83 deletions pkg/parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"runtime"
"time"

"github.com/itchyny/gojq"
"github.com/logrusorgru/aurora/v3"
)

Expand Down Expand Up @@ -130,84 +129,11 @@ type Summary struct {
Filtered int
}

var DefaultVariables = []string{
"$TS",
"$RAW", "$MSG",
"$LVL", "$UNKNOWN", "$TRACE", "$DEBUG", "$INFO", "$WARN", "$ERROR", "$PANIC", "$DPANIC", "$FATAL",
}

func prepareVariables(l *line) []interface{} {
return []interface{}{
float64(l.time.UnixNano()) / 1e9, // $TS
string(l.raw), l.msg,
uint8(l.lvl), uint8(LevelUnknown), uint8(LevelTrace), uint8(LevelDebug), uint8(LevelInfo), uint8(LevelWarn), uint8(LevelError), uint8(LevelPanic), uint8(LevelDPanic), uint8(LevelFatal),
}
}

const highlightKey = "__highlight"

// CompileJQ compiles the provided jq program.
func CompileJQ(p string) (*gojq.Code, error) {
if p == "" {
return nil, nil
}
p = "def highlight($cond): . + {__highlight: $cond};\n" + p
q, err := gojq.Parse(p)
if err != nil {
return nil, fmt.Errorf("parsing jq program %q: %v", p, err)
}
jq, err := gojq.Compile(q, gojq.WithVariables(DefaultVariables))
if err != nil {
return nil, fmt.Errorf("compiling jq program %q: %v", p, err)
}
return jq, nil
}

// runJQ runs the provided jq program on the provided line. It returns true if the result is empty
// (i.e., the line should be filtered out), and an error if the output type is invalid or another
// error occurred.
func runJQ(jq *gojq.Code, l *line) (bool, error) {
if jq == nil {
return false, nil
}
var filtered bool
iter := jq.Run(l.fields, prepareVariables(l)...)
if result, ok := iter.Next(); ok {
switch x := result.(type) {
case map[string]interface{}:
if raw, ok := x[highlightKey]; ok {
delete(x, highlightKey)
if hi, ok := raw.(bool); ok {
l.highlight = hi
}
}
l.fields = x
case nil:
return false, errors.New("unexpected nil result; yield an empty map ('{}') to delete all fields")
case error:
return false, fmt.Errorf("error: %w", x)
case bool:
return false, errors.New("unexpected boolean output; did you mean to use 'select(...)'?")
default:
return false, fmt.Errorf("unexpected result type %T(%#v)", result, result)
}
if _, ok = iter.Next(); ok {
// We only use the first line that is output. This can be revisited in the
// future.
return false, errors.New("unexpectedly produced more than 1 output")
}
} else {
filtered = true
l.fields = make(map[string]interface{})
}
return filtered, nil
}

// ReadLog reads a stream of JSON-formatted log lines from the provided reader according to the
// input schema, reformatting it and writing to the provided writer according to the output schema.
// Parse errors are handled according to the input schema. Any other errors, not including io.EOF
// on the reader, are returned.
func ReadLog(r io.Reader, w io.Writer, ins *InputSchema, outs *OutputSchema, jq *gojq.Code) (Summary, error) {
func ReadLog(r io.Reader, w io.Writer, ins *InputSchema, outs *OutputSchema, filter *FilterScheme) (Summary, error) {
s := bufio.NewScanner(r)
s.Buffer(make([]byte, 0, LineBufferSize), LineBufferSize)
var l line
Expand Down Expand Up @@ -309,18 +235,18 @@ func ReadLog(r io.Reader, w io.Writer, ins *InputSchema, outs *OutputSchema, jq

// Filter.
var err error
filtered, err = runJQ(jq, &l)
filtered, err = filter.Run(&l)
if err != nil {
addError = true
writeRawLine = true
recoverable = false
// It is questionable as to whether or not jq breaking means that we
// should stop processing the log entirely. It's probably a bug in
// the jq program that affects every line, so the sooner we return
// the error, the sooner the user can fix their program. But on the
// other hand, is it worth it to spend the time debugging a jq
// program that's only broken on one line out of a billion?
return fmt.Errorf("jq: %w", err)
// It is questionable as to whether or not a filter breaking means
// that we should stop processing the log entirely. It's probably a
// bug in the filter that affects every line, so the sooner we
// return the error, the sooner the user can fix their filter. But
// on the other hand, is it worth it to spend the time debugging a
// jq program that's only broken on one line out of a billion?
return fmt.Errorf("filter: %w", err)
}
if filtered {
sum.Filtered++
Expand Down
8 changes: 5 additions & 3 deletions pkg/parse/parse_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ import (
"testing"
"time"

"github.com/itchyny/gojq"
"github.com/jrockway/json-logs/pkg/parse/internal/fuzzsupport"
"github.com/logrusorgru/aurora/v3"
)

// runReadLog runs ReadLog against some input, and asserts that certain expectations are met. It's
// used to implement FuzzReadLogs and FuzzReadLogsWithJSON.
func runReadLog(t *testing.T, jq *gojq.Code, in []byte, expectedLines int) {
func runReadLog(t *testing.T, fs *FilterScheme, in []byte, expectedLines int) {
t.Helper()
if fs == nil {
fs = new(FilterScheme)
}
inbuf := bytes.NewReader(in)
ins := &InputSchema{
Strict: false,
Expand All @@ -37,7 +39,7 @@ func runReadLog(t *testing.T, jq *gojq.Code, in []byte, expectedLines int) {
},
}
outbuf := new(bytes.Buffer)
summary, err := ReadLog(inbuf, outbuf, ins, outs, jq)
summary, err := ReadLog(inbuf, outbuf, ins, outs, fs)
if err != nil {
if errors.Is(err, bufio.ErrTooLong) {
// This is a known limit and the fuzzer likes to produce very long
Expand Down
Loading

0 comments on commit 5d35de5

Please sign in to comment.