Skip to content

Commit

Permalink
Fix sqlite busy errors (#347)
Browse files Browse the repository at this point in the history
Fix sqlite busy errors with session based learning

There were a couple different errors
* We were still enqueing block ids and not sessions in some places
* When we call Enqueue we should filter out sessions that aren't
suitable for learning; this should minimize DB contention
* Improve logging

By default sqlite doesn't have any retries for SQLITE_BUSY errors. We
could configure a timeout for retries using a PRAGMA; we should probably
do that but I want to hold off to see if this PR fixes the contention.
  * Filed #348 to follow up on this

Use ko to create docker images as part of the development process
* This is convenient when we want to build a docker image from our
latest changes without doing a relase.
* Change the Dockerfile to not include any arguments so that it is
compatible with the image ko builds
  • Loading branch information
jlewi authored Nov 25, 2024
1 parent 93ff799 commit 2c1180c
Show file tree
Hide file tree
Showing 12 changed files with 286 additions and 129 deletions.
15 changes: 15 additions & 0 deletions app/.ko.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# https://ko.build/configuration/#setting-build-flags-and-ldflags
defaultFlags:
# https://ko.build/configuration/#naming-images
# --base-import-paths removes the md5hash from the image name
# but it appends to the repo path a name based on the path of the module which in this case is "app"
# TODO(jeremy): This doesn't seem to be working I still have to set the flag when I run ko
- --base-import-paths
builds:
- id: foyle
ldflags:
- -s
- -w
- -X 'github.com/jlewi/foyle/app/cmd.date={{.Date}}'
- -X 'github.com/jlewi/foyle/app/cmd.version=dev'
- -X 'github.com/jlewi/foyle/app/cmd.commit={{.Git.ShortCommit}}'
3 changes: 2 additions & 1 deletion app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ FROM ${RUNTIME_IMAGE}

COPY --from=builder /workspace/app/foyle /

ENTRYPOINT ["/foyle", "serve"]
# N.B. Don't set any arguments because we want to be compatible with the images ko builds
ENTRYPOINT ["/foyle"]
18 changes: 1 addition & 17 deletions app/pkg/analyze/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type blockItem struct {
}

// PostSessionEvent interface for functions to post session events.
type PostSessionEvent func(id string) error
type PostSessionEvent func(session *logspb.Session) error

// Run runs the analyzer; continually processing logs.
// learnNotifier is an optional function that will be called when a block is updated.
Expand Down Expand Up @@ -466,14 +466,6 @@ func (a *Analyzer) processLogEvent(ctx context.Context, entry *api.LogEntry) {
}); err != nil {
log.Error(err, "Failed to update block with execution", "blockId", bid)
}
// We need to enqueue the block for processing since it was executed.
// The learner will decide whether the blockLog has all the information it needs otherwise it will
// disregard the block item and wait for further events.
if a.learnNotifier != nil {
if err := a.learnNotifier(bid); err != nil {
log.Error(err, "Error notifying block event", "blockId", bid)
}
}
case v1alpha1.LogEventType_ACCEPTED:
fallthrough
case v1alpha1.LogEventType_REJECTED:
Expand Down Expand Up @@ -666,14 +658,6 @@ func (a *Analyzer) handleBlockEvents(ctx context.Context) {
if err != nil {
log.Error(err, "Error processing block", "blockId", blockItem.id)
}
// We need to enqueue the block for processing since it was executed.
// The learner will decide whether the blockLog has all the information it needs otherwise it will
// disregard the block item and wait for further events.
if a.learnNotifier != nil {
if err := a.learnNotifier(blockItem.id); err != nil {
log.Error(err, "Error notifying block event", "blockId", blockItem.id)
}
}
if a.signalBlockDone != nil {
a.signalBlockDone <- blockItem.id
}
Expand Down
15 changes: 5 additions & 10 deletions app/pkg/analyze/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ type fakeNotifier struct {
counts map[string]int
}

func (f *fakeNotifier) PostBlockEvent(blockID string) error {
func (f *fakeNotifier) PostSession(session *logspb.Session) error {
if f.counts == nil {
f.counts = make(map[string]int)
}
if _, ok := f.counts[blockID]; !ok {
f.counts[blockID] = 0
if _, ok := f.counts[session.GetContextId()]; !ok {
f.counts[session.GetContextId()] = 0

}
f.counts[blockID] += 1
f.counts[session.GetContextId()] += 1
return nil
}

Expand Down Expand Up @@ -276,7 +276,7 @@ func Test_Analyzer(t *testing.T) {
a.signalBlockDone = blockProccessed

fakeNotifier := &fakeNotifier{}
if err := a.Run(context.Background(), []string{rawDir}, fakeNotifier.PostBlockEvent); err != nil {
if err := a.Run(context.Background(), []string{rawDir}, fakeNotifier.PostSession); err != nil {
t.Fatalf("Analyze failed: %v", err)
}

Expand Down Expand Up @@ -323,11 +323,6 @@ func Test_Analyzer(t *testing.T) {
t.Errorf("Expected ExecutedBlock to be set")
}

// Check the block notifier was called twice; once after the generated block and once after the executed block
if fakeNotifier.counts[expectedBlockID] != 2 {
t.Errorf("Expected block notifier to be called twice but got %d", fakeNotifier.counts[expectedBlockID])
}

// Now append some logs to the logFile and see that they get processed
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions app/pkg/analyze/dbs.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package analyze

import (
"context"

"github.com/cockroachdb/pebble"
"github.com/jlewi/foyle/app/pkg/dbutil"
"github.com/jlewi/foyle/app/pkg/logs"
logspb "github.com/jlewi/foyle/protos/go/foyle/logs"
"github.com/pkg/errors"
"modernc.org/sqlite"
sqlite3 "modernc.org/sqlite/lib"
)

// NewLockingBlocksDB helper function to create a new LockingDB for BlockLog.
Expand Down Expand Up @@ -39,3 +45,14 @@ func getLogEntriesVersion(m *logspb.LogEntries) string {
func setLogEntriesVersion(m *logspb.LogEntries, version string) {
m.ResourceVersion = version
}

func logDBErrors(ctx context.Context, err error) {
log := logs.FromContext(ctx)
var sqlLiteErr *sqlite.Error
if errors.As(err, &sqlLiteErr) {
if sqlLiteErr.Code() == sqlite3.SQLITE_BUSY {
sqlLiteBusyErrs.Inc()
log.Error(err, "SQLITE_BUSY")
}
}
}
16 changes: 10 additions & 6 deletions app/pkg/analyze/session_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,22 @@ func (p *sessionBuilder) processLogEvent(entry *api.LogEntry, notifier PostSessi
return
}

var session *logspb.Session
updateFunc := func(s *logspb.Session) error {
return updateSessionFromEvent(event, entry.Time(), s)
err := updateSessionFromEvent(event, entry.Time(), s)
// Make a copy of the updated session because we will process it down below
session = s
return err
}

if err := p.sessions.Update(context.Background(), event.GetContextId(), updateFunc); err != nil {
log.Error(err, "Failed to update session", "event", event)
log.Error(err, "Failed to update session", "event", event, "contextId", event.GetContextId())
return
}

if event.Type == v1alpha1.LogEventType_SESSION_END {
if err := notifier(event.GetContextId()); err != nil {
log.Error(err, "Failed to send session process event")
if err := notifier(session); err != nil {
log.Error(err, "Failed to send session process event", "contextId", event.GetContextId())
}
}
}
Expand All @@ -90,7 +94,7 @@ func (p *sessionBuilder) processLLMUsage(entry *api.LogEntry) {
}
contextId, ok := entry.GetString("contextId")
if !ok {
log.Error(errors.New("Failed to handle LLMUsage log entry"), "LLMUsage is missing contextId", "entry", entry)
log.Error(errors.New("Failed to handle LLMUsage log entry"), "LLMUsage is missing contextId", "entry", entry, "contextId", contextId)
return
}

Expand All @@ -99,7 +103,7 @@ func (p *sessionBuilder) processLLMUsage(entry *api.LogEntry) {
}

if err := p.sessions.Update(context.Background(), contextId, updateFunc); err != nil {
log.Error(err, "Failed to update session", "usage", usage)
log.Error(err, "Failed to update session", "usage", usage, "contextId", contextId)
}
}

Expand Down
4 changes: 2 additions & 2 deletions app/pkg/analyze/session_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func setup() (testTuple, error) {
}

// Process the log entry
func testNotifier(contextId string) error {
fmt.Printf("Received session end event for context: %v", contextId)
func testNotifier(session *logspb.Session) error {
fmt.Printf("Received session end event for context: %v", session.GetContextId())
return nil
}

Expand Down
140 changes: 87 additions & 53 deletions app/pkg/analyze/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"os"
"path/filepath"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"connectrpc.com/connect"
"github.com/jlewi/foyle/app/pkg/logs"
"github.com/jlewi/foyle/app/pkg/runme/converters"
Expand All @@ -28,6 +31,21 @@ const (
SQLLiteDriver = "sqlite"
)

var (
sessCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "session_updates",
Help: "Number of sessions updated",
},
[]string{"status"},
)

sqlLiteBusyErrs = promauto.NewCounter(prometheus.CounterOpts{
Name: "sqlite_busy",
Help: "Number of operations that failed because sqlite was busy",
})
)

// GetDDL return the DDL for the database.
// This is a hack because the DDL statements for the sessions and eval results tables are in the same file and package.
// The Evaluator needs to be able to get the DDL in order to create the eval results table. We should clean this up
Expand Down Expand Up @@ -68,6 +86,7 @@ func (db *SessionsManager) Get(ctx context.Context, contextID string) (*logspb.S
sessRow, err := queries.GetSession(ctx, contextID)

if err != nil {
logDBErrors(ctx, err)
return nil, err
}

Expand All @@ -92,81 +111,96 @@ func (db *SessionsManager) Update(ctx context.Context, contextID string, updateF
}
log = log.WithValues("contextId", contextID)

sessCounter.WithLabelValues("start").Inc()

tx, err := db.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
// DO NOT COMMIT
sessCounter.WithLabelValues("failedstart").Inc()
return errors.Wrapf(err, "Failed to start transaction")
}

queries := db.queries.WithTx(tx)
// Read the record
sessRow, err := queries.GetSession(ctx, contextID)
err = func() error {
queries := db.queries.WithTx(tx)
// Read the record
sessRow, err := queries.GetSession(ctx, contextID)

// If the session doesn't exist then we do nothing because session is initializeed to empty session
session := &logspb.Session{
ContextId: contextID,
}
if err != nil {
if err != sql.ErrNoRows {
if txErr := tx.Rollback(); txErr != nil {
log.Error(txErr, "Failed to rollback transaction")
}
return errors.Wrapf(err, "Failed to get session with id %v", contextID)
// If the session doesn't exist then we do nothing because session is initializeed to empty session
session := &logspb.Session{
ContextId: contextID,
}
} else {
// Deserialize the proto
if err := proto.Unmarshal(sessRow.Proto, session); err != nil {
if txErr := tx.Rollback(); txErr != nil {
log.Error(txErr, "Failed to rollback transaction")
if err != nil {
logDBErrors(ctx, err)
if err != sql.ErrNoRows {
// DO NOT COMMIT
sessCounter.WithLabelValues("failedget").Inc()
return errors.Wrapf(err, "Failed to get session with id %v", contextID)
}
// ErrNoRows means the session doesn't exist so we just continue with the empty session
} else {
// Deserialize the proto
if err := proto.Unmarshal(sessRow.Proto, session); err != nil {
return errors.Wrapf(err, "Failed to deserialize session")
}
return errors.Wrapf(err, "Failed to deserialize session")
}
}

if err := updateFunc(session); err != nil {
if txErr := tx.Rollback(); txErr != nil {
log.Error(txErr, "Failed to rollback transaction")
// DO NOT COMMIT
sessCounter.WithLabelValues("callupdatefunc").Inc()

if err := updateFunc(session); err != nil {
return errors.Wrapf(err, "Failed to update session")
}
return errors.Wrapf(err, "Failed to update session")
}

newRow, err := protoToRow(session)
if err != nil {
if txErr := tx.Rollback(); txErr != nil {
log.Error(txErr, "Failed to rollback transaction")
newRow, err := protoToRow(session)
if err != nil {
return errors.Wrapf(err, "Failed to convert session proto to table row")
}
return errors.Wrapf(err, "Failed to convert session proto to table row")
}

if newRow.Contextid != contextID {
if txErr := tx.Rollback(); txErr != nil {
log.Error(txErr, "Failed to rollback transaction")
if newRow.Contextid != contextID {
return errors.WithStack(errors.Errorf("contextID in session doesn't match contextID. Update was called with contextID: %v but session has contextID: %v", contextID, newRow.Contextid))
}
return errors.WithStack(errors.Errorf("contextID in session doesn't match contextID. Update was called with contextID: %v but session has contextID: %v", contextID, newRow.Contextid))
}

update := fsql.UpdateSessionParams{
Contextid: contextID,
Proto: newRow.Proto,
Starttime: newRow.Starttime,
Endtime: newRow.Endtime,
Selectedid: newRow.Selectedid,
Selectedkind: newRow.Selectedkind,
TotalInputTokens: newRow.TotalInputTokens,
TotalOutputTokens: newRow.TotalOutputTokens,
NumGenerateTraces: newRow.NumGenerateTraces,
}
update := fsql.UpdateSessionParams{
Contextid: contextID,
Proto: newRow.Proto,
Starttime: newRow.Starttime,
Endtime: newRow.Endtime,
Selectedid: newRow.Selectedid,
Selectedkind: newRow.Selectedkind,
TotalInputTokens: newRow.TotalInputTokens,
TotalOutputTokens: newRow.TotalOutputTokens,
NumGenerateTraces: newRow.NumGenerateTraces,
}

if err := queries.UpdateSession(ctx, update); err != nil {
// DO NOT COMMIT
sessCounter.WithLabelValues("callupdatesession").Inc()
if err := queries.UpdateSession(ctx, update); err != nil {
logDBErrors(ctx, err)
return errors.Wrapf(err, "Failed to update session")
}
return nil
}()

if err == nil {
if err := tx.Commit(); err != nil {
logDBErrors(ctx, err)
log.Error(err, "Failed to commit transaction")
sessCounter.WithLabelValues("commitfail").Inc()
return errors.Wrapf(err, "Failed to commit transaction")
}
sessCounter.WithLabelValues("success").Inc()
} else {
logDBErrors(ctx, err)
sessCounter.WithLabelValues("fail").Inc()
log.Error(err, "Failed to update session")
if txErr := tx.Rollback(); txErr != nil {
log.Error(txErr, "Failed to rollback transaction")
}
return errors.Wrapf(err, "Failed to update session")
}

if err := tx.Commit(); err != nil {
return errors.Wrapf(err, "Failed to commit transaction")
return err
}

// DO NOT COMMIT
sessCounter.WithLabelValues("done").Inc()
return nil
}

Expand Down
Loading

0 comments on commit 2c1180c

Please sign in to comment.