Skip to content

Commit

Permalink
feat: add query-log-path configuration
Browse files Browse the repository at this point in the history
This PR adds the ability for an admin to define 'query-log-path' as a configuration item field.

A new configuration option is created

```toml
[data]
query-log-path = "/var/influx/query.log"
```

This will enable query logging.

Logged queries example:
```
{"level":"info","ts":1735248393.084461,"msg":"Executing query","query":"SHOW DATABASES"}
{"level":"info","ts":1735248395.092188,"msg":"Executing query","query":"SHOW DATABASES"}
{"level":"info","ts":1735248398.58039,"msg":"Executing query","query":"SELECT * FROM stress.autogen.m0 LIMIT 20"}
```
  • Loading branch information
devanbenz committed Dec 30, 2024
1 parent db52322 commit 410241e
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ func (s *Server) Open() error {
// so it only logs as is appropriate.
s.QueryExecutor.TaskManager.Logger = s.Logger
}
if s.config.Data.QueryLogPath != "" {
path := s.config.Data.QueryLogPath
s.QueryExecutor.WithLogWriter(s.Logger, path)
}

s.PointsWriter.WithLogger(s.Logger)
s.Subscriber.WithLogger(s.Logger)
for _, svc := range s.Services {
Expand Down
1 change: 1 addition & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
# Whether queries should be logged before execution. Very useful for troubleshooting, but will
# log any sensitive data contained within a query.
# query-log-enabled = true
# query-log-path = "/var/log/influxdb/query.log"

# It is possible to collect statistics of points written per-measurement and/or per-login.
# These can be accessed via the monitoring subsystem.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/davecgh/go-spew v1.1.1
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
github.com/fsnotify/fsnotify v1.4.7
github.com/go-chi/chi v4.1.0+incompatible
github.com/golang-jwt/jwt v3.2.1+incompatible
github.com/golang/mock v1.5.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P
github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gabriel-vasile/mimetype v1.4.0 h1:Cn9dkdYsMIu56tGho+fqzh7XmvY2YyGU0FnbhiOsEro=
github.com/gabriel-vasile/mimetype v1.4.0/go.mod h1:fA8fi6KUiG7MgQQ+mEWotXoEOvmxRtOJlERCzSmRvr8=
Expand Down
80 changes: 80 additions & 0 deletions query/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"sync/atomic"
"time"

"github.com/fsnotify/fsnotify"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxql"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
Expand Down Expand Up @@ -289,6 +291,84 @@ func (e *Executor) WithLogger(log *zap.Logger) {
e.TaskManager.Logger = e.Logger
}

func initQueryLogWriter(log *zap.Logger, e *Executor, path string) (*os.File, error) {
logFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
e.Logger.Error("failed to open log file", zap.Error(err))
return nil, err
}

existingCore := log.Core()

encoderConfig := zap.NewProductionEncoderConfig()

fileCore := zapcore.NewCore(
zapcore.NewJSONEncoder(encoderConfig),
zapcore.Lock(logFile),
zapcore.InfoLevel,
)

newCore := zapcore.NewTee(existingCore, fileCore)
e.Logger = zap.New(newCore)
e.TaskManager.Logger = e.Logger

return logFile, nil
}

func (e *Executor) WithLogWriter(log *zap.Logger, path string) {
var file *os.File
var err error

file, err = initQueryLogWriter(log, e, path)
if err != nil {
e.Logger.Error("failed to open log file", zap.Error(err))
return
}

go func() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
e.Logger.Error("failed to create log file watcher", zap.Error(err))
return
}

err = watcher.Add(path)
if err != nil {
e.Logger.Error("failed to watch log file", zap.Error(err))
return
}
defer watcher.Close()

for {
select {
case event, ok := <-watcher.Events:
if !ok {
e.Logger.Error("failed to watch log file", zap.String("event", event.Name))
return
}
e.Logger.Debug("event", zap.String("event", event.Name))
if event.Op == fsnotify.Remove || event.Op == fsnotify.Rename {
if err := file.Sync(); err != nil {
e.Logger.Error("failed to sync log file", zap.Error(err))
return
}
if err := file.Close(); err != nil {
e.Logger.Error("failed to close log file", zap.Error(err))
return
}

e.Logger.Debug("creating a new query log file; registered file was altered.", zap.String("event", event.Name), zap.String("path", path))
file, err = initQueryLogWriter(log, e, path)
if err != nil {
e.Logger.Error("failed to create log file watcher", zap.Error(err))
return
}
}
}
}
}()
}

// ExecuteQuery executes each statement within a query.
func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result {
results := make(chan *Result)
Expand Down
30 changes: 30 additions & 0 deletions query/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package query_test
import (
"errors"
"fmt"
"github.com/stretchr/testify/require"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -595,6 +597,34 @@ func TestQueryExecutor_InvalidSource(t *testing.T) {
}
}

func TestQueryExecutor_WriteQueryToLog(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
require.NoError(t, err, "parse query")

f, err := os.CreateTemp("", "query-test.log")
require.NoError(t, err, "create temp file")

defer os.Remove(f.Name())

e := NewQueryExecutor()
e.WithLogWriter(e.Logger, f.Name())

e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
require.Equal(t, uint64(1), ctx.QueryID, "query ID")
return nil
},
}

discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))
err = f.Close()
require.NoError(t, err, "close temp file")

dat, err := os.ReadFile(f.Name())
cont := strings.Contains(string(dat), "SELECT count(value) FROM cpu")
require.True(t, cont, "expected query output")
}

func discardOutput(results <-chan *query.Result) {
for range results {
// Read all results and discard.
Expand Down
3 changes: 3 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type Config struct {
// Query logging
QueryLogEnabled bool `toml:"query-log-enabled"`

// Query logging directed to a log file
QueryLogPath string `toml:"query-log-path"`

// Compaction options for tsm1 (descriptions above with defaults)
CacheMaxMemorySize toml.Size `toml:"cache-max-memory-size"`
CacheSnapshotMemorySize toml.Size `toml:"cache-snapshot-memory-size"`
Expand Down

0 comments on commit 410241e

Please sign in to comment.