Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add query-log-path configuration #25710

Draft
wants to merge 15 commits into
base: master-1.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,16 @@ func (s *Server) Open() error {
// so it only logs as is appropriate.
s.QueryExecutor.TaskManager.Logger = s.Logger
}
if s.config.Data.QueryLogPath != "" {
path := s.config.Data.QueryLogPath
flw := query.NewFileLogWatcher(s.QueryExecutor, path, s.Logger, s.config.Logging.Format)
if flw != nil {
s.QueryExecutor.WithLogWriter(flw, context.Background())
} else {
s.Logger.Error("error creating log writer", zap.String("path", path))
}
}

s.PointsWriter.WithLogger(s.Logger)
s.Subscriber.WithLogger(s.Logger)
for _, svc := range s.Services {
Expand Down
1 change: 1 addition & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
# Whether queries should be logged before execution. Very useful for troubleshooting, but will
# log any sensitive data contained within a query.
# query-log-enabled = true
# query-log-path = "/var/log/influxdb/query.log"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the default be the empty string?


# It is possible to collect statistics of points written per-measurement and/or per-login.
# These can be accessed via the monitoring subsystem.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/davecgh/go-spew v1.1.1
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
github.com/fsnotify/fsnotify v1.4.7
github.com/go-chi/chi v4.1.0+incompatible
github.com/golang-jwt/jwt v3.2.1+incompatible
github.com/golang/mock v1.5.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P
github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gabriel-vasile/mimetype v1.4.0 h1:Cn9dkdYsMIu56tGho+fqzh7XmvY2YyGU0FnbhiOsEro=
github.com/gabriel-vasile/mimetype v1.4.0/go.mod h1:fA8fi6KUiG7MgQQ+mEWotXoEOvmxRtOJlERCzSmRvr8=
Expand Down
8 changes: 4 additions & 4 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) {
}
}

encoder, err := newEncoder(format)
encoder, err := NewEncoder(format)
if err != nil {
return nil, err
}
Expand All @@ -51,8 +51,8 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) {
), zap.Fields(zap.String("log_id", nextID()))), nil
}

func newEncoder(format string) (zapcore.Encoder, error) {
config := newEncoderConfig()
func NewEncoder(format string) (zapcore.Encoder, error) {
config := NewEncoderConfig()
switch format {
case "json":
return zapcore.NewJSONEncoder(config), nil
Expand All @@ -65,7 +65,7 @@ func newEncoder(format string) (zapcore.Encoder, error) {
}
}

func newEncoderConfig() zapcore.EncoderConfig {
func NewEncoderConfig() zapcore.EncoderConfig {
config := zap.NewProductionEncoderConfig()
config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(ts.UTC().Format(TimeFormat))
Expand Down
83 changes: 83 additions & 0 deletions query/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"sync/atomic"
"time"

"github.com/fsnotify/fsnotify"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxql"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

var (
Expand Down Expand Up @@ -228,6 +230,14 @@ type StatementNormalizer interface {
NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error
}

// WatcherInterface is used for any file watch functionality using fsnotify
type WatcherInterface interface {
FileChangeCapture() error
GetLogger() *zap.Logger
GetLogPath() string
Close()
}

// Executor executes every statement in an Query.
type Executor struct {
// Used for executing a statement in the query.
Expand All @@ -242,6 +252,8 @@ type Executor struct {

// expvar-based stats.
stats *Statistics

mu sync.Mutex
}

// NewExecutor returns a new instance of Executor.
Expand Down Expand Up @@ -289,6 +301,77 @@ func (e *Executor) WithLogger(log *zap.Logger) {
e.TaskManager.Logger = e.Logger
}

func startFileLogWatcher(w WatcherInterface, e *Executor, ctx context.Context) error {
path := w.GetLogPath()
fsnotif, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("create Watcher: %w", err)
}
defer fsnotif.Close()

if err := fsnotif.Add(path); err != nil {
return fmt.Errorf("add watch path: %w", err)
}

for {
select {
case event, ok := <-fsnotif.Events:
if !ok {
return fmt.Errorf("Watcher event channel closed")
}

e.Logger.Debug("received file event", zap.String("event", event.Name))

if event.Op == fsnotify.Remove || event.Op == fsnotify.Rename {
e.Logger.Info("log file altered, creating new Watcher",
zap.String("event", event.Name),
zap.String("path", path))

e.mu.Lock()
if err := w.FileChangeCapture(); err != nil {
return fmt.Errorf("handle file change: %w", err)
}

e.Logger = w.GetLogger()
e.TaskManager.Logger = e.Logger
e.mu.Unlock()
}

case err, ok := <-fsnotif.Errors:
if !ok {
return fmt.Errorf("Watcher error channel closed")
}
return fmt.Errorf("Watcher error: %w", err)

case <-ctx.Done():
e.Logger.Info("closing file Watcher")
return ctx.Err()
}
}
}

func (e *Executor) WithLogWriter(w WatcherInterface, ctx context.Context) {
e.Logger.Info("starting file watcher", zap.String("path", w.GetLogPath()))
errs, ctx := errgroup.WithContext(ctx)

e.mu.Lock()
e.Logger = w.GetLogger()
e.TaskManager.Logger = e.Logger
e.mu.Unlock()

errs.Go(func() error {
return startFileLogWatcher(w, e, ctx)
})

go func() {
err := errs.Wait()
if err != nil {
e.Logger.Error("file log Watcher error", zap.Error(err))
return
}
}()
}

// ExecuteQuery executes each statement within a query.
func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result {
results := make(chan *Result)
Expand Down
172 changes: 172 additions & 0 deletions query/executor_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package query_test

import (
"context"
"errors"
"fmt"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -595,6 +601,172 @@ func TestQueryExecutor_InvalidSource(t *testing.T) {
}
}

type mockWatcher struct {
ChangeEvents []string
path string
tmpFile *os.File
log *zap.Logger
t *testing.T
Mu sync.Mutex
}

func newMockWatcher(t *testing.T, path string, tmpFile *os.File) *mockWatcher {
encoderConfig := zap.NewProductionEncoderConfig()

fileCore := zapcore.NewCore(
zapcore.NewJSONEncoder(encoderConfig),
zapcore.Lock(tmpFile),
zapcore.InfoLevel,
)

logger := zap.New(fileCore)
return &mockWatcher{
ChangeEvents: make([]string, 0),
path: path,
tmpFile: tmpFile,
log: logger,
t: t,
Mu: sync.Mutex{},
}
}

func (m *mockWatcher) GetLogger() *zap.Logger {
return m.log
}

func (m *mockWatcher) GetLogPath() string {
return m.path
}

func (m *mockWatcher) FileChangeCapture() error {
m.Mu.Lock()
defer m.Mu.Unlock()
m.ChangeEvents = append(m.ChangeEvents, "file updated")
return nil
}

func (m *mockWatcher) Close() {
return
}

func TestQueryExecutor_WriteQueryToLog(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
require.NoError(t, err, "parse query")

f, err := os.CreateTemp("", "query-test.1.log")
require.NoError(t, err, "create temp file")
err = f.Sync()
require.NoError(t, err, "sync temp file")

e := NewQueryExecutor()
mockWatcher := newMockWatcher(t, f.Name(), f)
e.WithLogWriter(mockWatcher, context.Background())

e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
require.Equal(t, uint64(1), ctx.QueryID, "query ID")
return nil
},
}

discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))
err = f.Sync()
require.NoError(t, err, "sync temp file")

dat, err := os.ReadFile(f.Name())
cont := strings.Contains(string(dat), "SELECT count(value) FROM cpu")
require.True(t, cont, "expected query output")
err = os.Remove(f.Name())
require.NoError(t, err, "remove temp file")
}

// Test to ensure that Watcher creates new file on file rename
func TestQueryExecutor_WriteQueryToLog_WatcherRemoveFile(t *testing.T) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that CI doesn't like these tests where on my local machine they pass no problem. I may need to adjust how I'm doing this 🤔

q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
require.NoError(t, err, "parse query")

f, err := os.CreateTemp("", "query-test.2.log")
require.NoError(t, err, "create temp file")
err = f.Sync()
require.NoError(t, err, "sync temp file")

e := NewQueryExecutor()
mockWatcher := newMockWatcher(t, f.Name(), f)
e.WithLogWriter(mockWatcher, context.Background())

e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
require.Equal(t, uint64(1), ctx.QueryID, "query ID")
return nil
},
}

discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))
err = f.Sync()
require.NoError(t, err, "sync temp file")

dat, err := os.ReadFile(f.Name())
cont := strings.Contains(string(dat), "SELECT count(value) FROM cpu")
require.True(t, cont, "expected query output")
require.Equal(t, 0, len(mockWatcher.ChangeEvents), "expected change events length")

err = f.Close()
require.NoError(t, err, "close temp file")
// Remove file -- there should be a change event now
err = os.Remove(f.Name())
require.NoError(t, err, "remove temp file")
// sleep for a few ms because fsnotify needs to pick up event
time.Sleep(100 * time.Millisecond)
mockWatcher.Mu.Lock()
defer mockWatcher.Mu.Unlock()
require.Equal(t, 1, len(mockWatcher.ChangeEvents), "expected change events length")
}

// Test to ensure that Watcher creates new file on file rename
func TestQueryExecutor_WriteQueryToLog_WatcherChangeFile(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
require.NoError(t, err, "parse query")

f, err := os.CreateTemp("", "query-test.3.log")
require.NoError(t, err, "create temp file")
err = f.Sync()
require.NoError(t, err, "sync temp file")

e := NewQueryExecutor()
mockWatcher := newMockWatcher(t, f.Name(), f)
e.WithLogWriter(mockWatcher, context.Background())

e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
require.Equal(t, uint64(1), ctx.QueryID, "query ID")
return nil
},
}

discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))
err = f.Sync()
require.NoError(t, err, "sync temp file")

dat, err := os.ReadFile(f.Name())
cont := strings.Contains(string(dat), "SELECT count(value) FROM cpu")
require.True(t, cont, "expected query output")
mockWatcher.Mu.Lock()
require.Equal(t, 0, len(mockWatcher.ChangeEvents), "expected change events length")
mockWatcher.Mu.Unlock()

// Remove file -- there should be a change event now
err = os.Rename(f.Name(), f.Name()+".foo")
require.NoError(t, err, "rename temp file")
// sleep for a few ms because fsnotify needs to pick up event
time.Sleep(100 * time.Millisecond)

mockWatcher.Mu.Lock()
require.Equal(t, 1, len(mockWatcher.ChangeEvents), "expected change events length")
mockWatcher.Mu.Unlock()
err = os.Remove(f.Name() + ".foo")
require.NoError(t, err, "remove temp file")
}

func discardOutput(results <-chan *query.Result) {
for range results {
// Read all results and discard.
Expand Down
Loading
Loading