Skip to content

Commit

Permalink
fix(generators): address problem when size ot pk is too small
Browse files Browse the repository at this point in the history
When pk is too small gemini had number of problems:
1. If it is one byte size, then generators are iterating over 0-255, while most of them in flights, load threads are getting stuck on waiting for valies, while generators can't provide new one
2. Murmur to get 100% partition coverage as result some partitions are not getting data while load threads still target them making treads stuck forever
  • Loading branch information
Dmitry Kropachev committed Jul 17, 2023
1 parent d351b0e commit 420a85c
Show file tree
Hide file tree
Showing 13 changed files with 307 additions and 44 deletions.
12 changes: 9 additions & 3 deletions cmd/gemini/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,26 @@ func createGenerators(
var gs []*generators.Generator
for id := range schema.Tables {
table := schema.Tables[id]
pkVariations := table.PartitionKeys.ValueVariationsNumber(&partitionRangeConfig)

distFunc, err := createDistributionFunc(partitionKeyDistribution, partitionCount, seed, stdDistMean, oneStdDev)
distFunc, err := createDistributionFunc(partitionKeyDistribution, distributionSize, seed, stdDistMean, oneStdDev)
if err != nil {
return nil, err
}

gCfg := &generators.Config{
tablePartConfig := &generators.Config{
PartitionsRangeConfig: partitionRangeConfig,
PartitionsCount: distributionSize,
PartitionsDistributionFunc: distFunc,
Seed: seed,
PkUsedBufferSize: pkBufferReuseSize,
}
g := generators.NewGenerator(table, gCfg, logger.Named("generators"))
g := generators.NewGenerator(table, tablePartConfig, logger.Named("generators"))
if pkVariations < 2^32 {
// Low partition key variation can lead to having staled partitions
// Let's detect and mark them before running test
g.FindAndMarkStalePartitions()
}
gs = append(gs, g)
}
return gs, nil
Expand Down
48 changes: 32 additions & 16 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package main

import (
"encoding/binary"
"encoding/json"
"fmt"
"log"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/scylladb/gemini/pkg/builders"
"github.com/scylladb/gemini/pkg/generators"
"github.com/scylladb/gemini/pkg/jobs"
"github.com/scylladb/gemini/pkg/realrandom"
"github.com/scylladb/gemini/pkg/replication"
"github.com/scylladb/gemini/pkg/store"
"github.com/scylladb/gemini/pkg/typedef"
Expand All @@ -40,8 +40,6 @@ import (
"github.com/scylladb/gemini/pkg/status"
"github.com/scylladb/gemini/pkg/stop"

crand "crypto/rand"

"github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"
"github.com/pkg/errors"
Expand Down Expand Up @@ -188,10 +186,6 @@ func run(_ *cobra.Command, _ []string) error {
}()
}

if err = printSetup(intSeed, intSchemaSeed); err != nil {
return errors.Wrapf(err, "unable to print setup")
}

outFile, err := createFile(outFileArg, os.Stdout)
if err != nil {
return err
Expand All @@ -209,10 +203,15 @@ func run(_ *cobra.Command, _ []string) error {
return errors.Wrap(err, "cannot create schema")
}
} else {
schema = generators.GenSchema(schemaConfig, intSchemaSeed)
schema, intSchemaSeed, err = generateSchema(logger, schemaConfig, schemaSeed)
if err != nil {
return errors.Wrapf(err, "failed to create schema for seed %s", schemaSeed)
}
}

jsonSchema, _ := json.MarshalIndent(schema, "", " ")

printSetup(intSeed, intSchemaSeed)
fmt.Printf("Schema: %v\n", string(jsonSchema))

testCluster, oracleCluster := createClusters(cons, testHostSelectionPolicy, oracleHostSelectionPolicy, logger)
Expand Down Expand Up @@ -534,7 +533,7 @@ func init() {
rootCmd.Flags().IntVarP(&maxErrorsToStore, "max-errors-to-store", "", 1000, "Maximum number of errors to store and output at the end")
}

func printSetup(seed, schemaSeed uint64) error {
func printSetup(seed, schemaSeed uint64) {
tw := new(tabwriter.Writer)
tw.Init(os.Stdout, 0, 8, 2, '\t', tabwriter.AlignRight)
fmt.Fprintf(tw, "Seed:\t%d\n", seed)
Expand All @@ -550,16 +549,10 @@ func printSetup(seed, schemaSeed uint64) error {
fmt.Fprintf(tw, "Output file:\t%s\n", outFileArg)
}
tw.Flush()
return nil
}

func RealRandom() uint64 {
var b [8]byte
_, err := crand.Read(b[:])
if err != nil {
return uint64(time.Now().Nanosecond() * time.Now().Second())
}
return binary.LittleEndian.Uint64(b[:])
return rand.New(realrandom.Source).Uint64()
}

func validateSeed(seed string) error {
Expand All @@ -577,3 +570,26 @@ func seedFromString(seed string) uint64 {
val, _ := strconv.ParseUint(seed, 10, 64)
return val
}

// generateSchema generates schema, if schema seed is random and schema did not pass validation it regenerates it
func generateSchema(logger *zap.Logger, sc typedef.SchemaConfig, schemaSeed string) (schema *typedef.Schema, intSchemaSeed uint64, err error) {
intSchemaSeed = seedFromString(schemaSeed)
schema = generators.GenSchema(sc, intSchemaSeed)
err = schema.Validate(partitionCount)
if err == nil {
return schema, intSchemaSeed, nil
}
if schemaSeed != "random" {
// If user provided schema, allow to run it, but log warning
logger.Warn(errors.Wrap(err, "validation failed, running this test could end up in error or stale gemini").Error())
return schema, intSchemaSeed, nil
}

for err != nil {
intSchemaSeed = seedFromString(schemaSeed)
schema = generators.GenSchema(sc, intSchemaSeed)
err = schema.Validate(partitionCount)
}

return schema, intSchemaSeed, nil
}
59 changes: 49 additions & 10 deletions pkg/generators/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,30 +96,43 @@ func NewGenerator(table *typedef.Table, config *Config, logger *zap.Logger) *Gen
}

func (g *Generator) Get() *typedef.ValueWithToken {
return g.partitions.GetPartitionForToken(g.idxFunc()).get()
targetPart := g.GetPartitionForToken(g.idxFunc())
for targetPart.Stale() {
targetPart = g.GetPartitionForToken(g.idxFunc())
}
out := targetPart.get()
return out
}

func (g *Generator) GetPartitionForToken(token TokenIndex) *Partition {
return g.partitions[g.shardOf(uint64(token))]
}

// GetOld returns a previously used value and token or a new if
// the old queue is empty.
func (g *Generator) GetOld() *typedef.ValueWithToken {
return g.partitions.GetPartitionForToken(g.idxFunc()).getOld()
targetPart := g.GetPartitionForToken(g.idxFunc())
for targetPart.Stale() {
targetPart = g.GetPartitionForToken(g.idxFunc())
}
return targetPart.getOld()
}

// GiveOld returns the supplied value for later reuse unless
func (g *Generator) GiveOld(v *typedef.ValueWithToken) {
g.partitions.GetPartitionForToken(TokenIndex(v.Token)).giveOld(v)
g.GetPartitionForToken(TokenIndex(v.Token)).giveOld(v)
}

// GiveOlds returns the supplied value for later reuse unless
func (g *Generator) GiveOlds(v []*typedef.ValueWithToken) {
for _, token := range v {
g.partitions.GetPartitionForToken(TokenIndex(token.Token)).giveOld(token)
// GiveOlds returns the supplied values for later reuse unless
func (g *Generator) GiveOlds(tokens []*typedef.ValueWithToken) {
for _, token := range tokens {
g.GiveOld(token)
}
}

// ReleaseToken removes the corresponding token from the in-flight tracking.
func (g *Generator) ReleaseToken(token uint64) {
g.partitions.GetPartitionForToken(TokenIndex(token)).releaseToken(token)
g.GetPartitionForToken(TokenIndex(token)).releaseToken(token)
}

func (g *Generator) Start(stopFlag *stop.Flag) {
Expand All @@ -140,14 +153,36 @@ func (g *Generator) Start(stopFlag *stop.Flag) {
}()
}

func (g *Generator) FindAndMarkStalePartitions() {
r := rand.New(rand.NewSource(10))
nonStale := make([]bool, g.partitionCount)
for n := uint64(0); n < g.partitionCount*100; n++ {
values := CreatePartitionKeyValues(g.table, r, &g.partitionsConfig)
token, err := g.routingKeyCreator.GetHash(g.table, values)
if err != nil {
g.logger.Panic(errors.Wrap(err, "failed to get primary key hash").Error())
}
nonStale[g.shardOf(token)] = true
}

for idx, val := range nonStale {
if !val {
g.partitions[idx].MarkStale()
}
}
}

// fillAllPartitions guarantees that each partition was tested to be full
// at least once since the function started and before it ended.
// In other words no partition will be starved.
func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
pFilled := make([]bool, len(g.partitions))
allFilled := func() bool {
for _, filled := range pFilled {
for idx, filled := range pFilled {
if !filled {
if g.partitions[idx].Stale() {
continue
}
return false
}
}
Expand All @@ -162,7 +197,7 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
g.cntCreated++
idx := token % g.partitionCount
partition := g.partitions[idx]
if partition.inFlight.Has(token) {
if partition.Stale() || partition.inFlight.Has(token) {
continue
}
select {
Expand All @@ -178,3 +213,7 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
}
}
}

func (g *Generator) shardOf(token uint64) int {
return int(token % g.partitionCount)
}
31 changes: 21 additions & 10 deletions pkg/generators/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ type Partition struct {
wakeUpSignal chan<- struct{} // wakes up generator
closed bool
lock sync.RWMutex
isStale bool
}

func (s *Partition) MarkStale() {
s.isStale = true
s.Close()
}

func (s *Partition) Stale() bool {
return s.isStale
}

// get returns a new value and ensures that it's corresponding token
Expand Down Expand Up @@ -103,18 +113,23 @@ func (s *Partition) safelyGetOldValuesChannel() chan *typedef.ValueWithToken {
return s.oldValues
}

func (s *Partition) safelyCloseOldValuesChannel() {
func (s *Partition) Close() {
s.lock.RLock()
if s.closed {
s.lock.RUnlock()
return
}
s.lock.RUnlock()
s.lock.Lock()
if s.closed {
return
}
s.closed = true
close(s.values)
close(s.oldValues)
s.lock.Unlock()
}

func (s *Partition) Close() {
close(s.values)
s.safelyCloseOldValuesChannel()
}

type Partitions []*Partition

func (p Partitions) CloseAll() {
Expand All @@ -123,10 +138,6 @@ func (p Partitions) CloseAll() {
}
}

func (p Partitions) GetPartitionForToken(token TokenIndex) *Partition {
return p[uint64(token)%uint64(len(p))]
}

func NewPartitions(count, pkBufferSize int, wakeUpSignal chan struct{}) Partitions {
partitions := make(Partitions, count)
for i := 0; i < len(partitions); i++ {
Expand Down
65 changes: 65 additions & 0 deletions pkg/realrandom/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2019 ScyllaDB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package realrandom

import (
crand "crypto/rand"
"encoding/binary"
"math/bits"
"time"

"golang.org/x/exp/rand"
)

var Source rand.Source

type crandSource struct{}

func (c *crandSource) Uint64() uint64 {
var out [8]byte
_, _ = crand.Read(out[:])
return binary.LittleEndian.Uint64(out[:])
}

func (c *crandSource) Seed(_ uint64) {}

type TimeSource struct {
source rand.Source
}

func NewTimeSource() *TimeSource {
now := time.Now()
return &TimeSource{
source: rand.NewSource(uint64(now.Nanosecond() * now.Second())),
}
}

func (c *TimeSource) Uint64() uint64 {
now := time.Now()
val := c.source.Uint64()
return bits.RotateLeft64(val^uint64(now.Nanosecond()*now.Second()), -int(val>>58))
}

func (c *TimeSource) Seed(_ uint64) {}

func init() {
var b [8]byte
_, err := crand.Read(b[:])
if err == nil {
Source = &crandSource{}
} else {
Source = NewTimeSource()
}
}
Loading

0 comments on commit 420a85c

Please sign in to comment.