Skip to content

Commit

Permalink
fix: local storage reservations fixes (#11866)
Browse files Browse the repository at this point in the history
* paths: Debugging local storage reservations

* paths: Log when individual reservation is less than on-disk space

* paths: fix debug reservations print

* paths: More reserve logs

* paths: More more reserve logs

* paths: add stacks to duplicate done call log

* curio: task storage: Release storage at most once

* curio: cleanup before restarting sdr

* address review

* paths: Simplify reservation release logic
  • Loading branch information
magik6k authored Apr 15, 2024
1 parent 50ed73d commit bc43bd6
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 36 deletions.
8 changes: 8 additions & 0 deletions curiosrc/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID,
return xerrors.Errorf("computing replica id: %w", err)
}

// make sure the cache dir is empty
if err := os.RemoveAll(paths.Cache); err != nil {
return xerrors.Errorf("removing cache dir: %w", err)
}
if err := os.MkdirAll(paths.Cache, 0755); err != nil {
return xerrors.Errorf("mkdir cache dir: %w", err)
}

// generate new sector key
err = ffi.GenerateSDR(
sector.ProofType,
Expand Down
8 changes: 7 additions & 1 deletion curiosrc/ffi/task_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ffi

import (
"context"
"sync"
"time"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -170,9 +171,14 @@ func (t *TaskStorage) Claim(taskID int) error {
return err
}

var releaseOnce sync.Once
releaseFunc := func() {
releaseOnce.Do(release)
}

sres := &StorageReservation{
SectorRef: sectorRef,
Release: release,
Release: releaseFunc,
Paths: pathsFs,
PathIDs: pathIDs,

Expand Down
125 changes: 90 additions & 35 deletions storage/paths/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"os"
"path/filepath"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -45,20 +46,26 @@ type Local struct {
localLk sync.RWMutex
}

type sectorFile struct {
sid abi.SectorID
ft storiface.SectorFileType
}

type path struct {
local string // absolute local path
maxStorage uint64

reserved int64
reservations map[abi.SectorID]storiface.SectorFileType
reservations map[sectorFile]int64
}

// statExistingSectorForReservation is optional parameter for stat method
// which will make it take into account existing sectors when calculating
// available space for new reservations
type statExistingSectorForReservation struct {
id abi.SectorID
ft storiface.SectorFileType
id abi.SectorID
ft storiface.SectorFileType
overhead int64
}

func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservation) (stat fsutil.FsStat, newResvOnDisk int64, err error) {
Expand All @@ -72,7 +79,7 @@ func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservat
stat.Reserved = p.reserved
var newReserveOnDisk int64

accountExistingFiles := func(id abi.SectorID, fileType storiface.SectorFileType) (int64, error) {
accountExistingFiles := func(id abi.SectorID, fileType storiface.SectorFileType, overhead int64) (int64, error) {
sp := p.sectorPath(id, fileType)

used, err := ls.DiskUsage(sp)
Expand All @@ -94,35 +101,58 @@ func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservat
return 0, nil
}

log.Debugw("accounting existing files", "id", id, "fileType", fileType, "path", sp, "used", used, "overhead", overhead)
return used, nil
}

for id, ft := range p.reservations {
for _, fileType := range ft.AllSet() {
onDisk, err := accountExistingFiles(id, fileType)
if err != nil {
return fsutil.FsStat{}, 0, err
}
stat.Reserved -= onDisk
for id, oh := range p.reservations {
onDisk, err := accountExistingFiles(id.sid, id.ft, oh)
if err != nil {
return fsutil.FsStat{}, 0, err
}
if onDisk > oh {
log.Warnw("reserved space on disk is greater than expected", "id", id.sid, "fileType", id.ft, "onDisk", onDisk, "oh", oh)
onDisk = oh
}

stat.Reserved -= onDisk
}
for _, reservation := range newReserve {
for _, fileType := range reservation.ft.AllSet() {
if p.reservations[reservation.id]&fileType != 0 {
log.Debugw("accounting existing files for new reservation", "id", reservation.id, "fileType", fileType, "overhead", reservation.overhead)

resID := sectorFile{reservation.id, fileType}

if _, has := p.reservations[resID]; has {
// already accounted for
continue
}

onDisk, err := accountExistingFiles(reservation.id, fileType)
onDisk, err := accountExistingFiles(reservation.id, fileType, reservation.overhead)
if err != nil {
return fsutil.FsStat{}, 0, err
}
if onDisk > reservation.overhead {
log.Warnw("reserved space on disk is greater than expected (new resv)", "id", reservation.id, "fileType", fileType, "onDisk", onDisk, "oh", reservation.overhead)
onDisk = reservation.overhead
}

newReserveOnDisk += onDisk
}
}

if stat.Reserved < 0 {
log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved)
//log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved)
var jsonReservations []map[string]interface{}
for id, res := range p.reservations {
jsonReservations = append(jsonReservations, map[string]interface{}{
"id": id.sid,
"ft": id.ft,
"res": res,
})
}

log.Warnw("negative reserved storage", "reserved", stat.Reserved, "origResv", p.reserved, "reservations", len(p.reservations), "newReserveOnDisk", newReserveOnDisk, "reservations", jsonReservations)
stat.Reserved = 0
}

Expand Down Expand Up @@ -199,7 +229,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {

maxStorage: meta.MaxStorage,
reserved: 0,
reservations: map[abi.SectorID]storiface.SectorFileType{},
reservations: map[sectorFile]int64{},
}

fst, _, err := out.stat(st.localStorage)
Expand Down Expand Up @@ -430,19 +460,43 @@ func (st *Local) reportStorage(ctx context.Context) {
}
}

func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (release func(), err error) {
ssize, err := sid.ProofType.SectorSize()
if err != nil {
return nil, err
}

st.localLk.Lock()

done := func() {}
deferredDone := func() { done() }
var releaseCalled bool

// double release debug guard
var firstDonebuf []byte
var releaseFuncs []func()

release = func() {
for _, releaseFunc := range releaseFuncs {
releaseFunc()
}

// debug guard against double release call
if releaseCalled {
curStack := make([]byte, 20480)
curStack = curStack[:runtime.Stack(curStack, false)]

log.Errorw("double release call", "sector", sid, "fileType", ft, "prevStack", string(firstDonebuf), "curStack", string(curStack))
}

firstDonebuf = make([]byte, 20480)
firstDonebuf = firstDonebuf[:runtime.Stack(firstDonebuf, false)]

releaseCalled = true
}

cleanupOnError := func() { release() }
defer func() {
st.localLk.Unlock()
deferredDone()
cleanupOnError()
}()

for _, fileType := range ft.AllSet() {
Expand All @@ -453,13 +507,13 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
return nil, errPathNotFound
}

stat, resvOnDisk, err := p.stat(st.localStorage, statExistingSectorForReservation{sid.ID, fileType})
overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen

stat, resvOnDisk, err := p.stat(st.localStorage, statExistingSectorForReservation{sid.ID, fileType, overhead})
if err != nil {
return nil, xerrors.Errorf("getting local storage stat: %w", err)
}

overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen

if overhead-resvOnDisk < 0 {
log.Errorw("negative overhead vs on-disk data", "overhead", overhead, "on-disk", resvOnDisk, "id", id, "sector", sid, "fileType", fileType)
resvOnDisk = overhead
Expand All @@ -469,27 +523,28 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available))
}

p.reserved += overhead
p.reservations[sid.ID] |= fileType
resID := sectorFile{sid.ID, fileType}

prevDone := done
saveFileType := fileType
done = func() {
prevDone()
log.Debugw("reserve add", "id", id, "sector", sid, "fileType", fileType, "overhead", overhead, "reserved-before", p.reserved, "reserved-after", p.reserved+overhead)

p.reserved += overhead
p.reservations[resID] = overhead

releaseFuncs = append(releaseFuncs, func() {
st.localLk.Lock()
defer st.localLk.Unlock()

log.Debugw("reserve release", "id", id, "sector", sid, "fileType", fileType, "overhead", overhead, "reserved-before", p.reserved, "reserved-after", p.reserved-overhead)

p.reserved -= overhead
p.reservations[sid.ID] ^= saveFileType
if p.reservations[sid.ID] == storiface.FTNone {
delete(p.reservations, sid.ID)
}
}
delete(p.reservations, resID)
})
}

deferredDone = func() {}
return done, nil
// no errors, don't cleanup, caller will call release
cleanupOnError = func() {}

return release, nil
}

func (st *Local) AcquireSector(ctx context.Context, sid storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode, opts ...storiface.AcquireOption) (storiface.SectorPaths, storiface.SectorPaths, error) {
Expand Down

0 comments on commit bc43bd6

Please sign in to comment.