Skip to content

Commit

Permalink
Merge pull request #651 from intel-go/gregory/performance
Browse files Browse the repository at this point in the history
Implemented performance improvements
  • Loading branch information
gshimansky authored Sep 18, 2019
2 parents 7fd91a6 + 3d0edd9 commit b78cb92
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 72 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# emacs
*~
doc
GPATH
GTAGS
GRTAGS
71 changes: 55 additions & 16 deletions flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func addReceiver(portId uint16, out low.Rings, inIndexNumber int32) {
par.port = low.GetPort(portId)
par.out = out
par.status = make([]int32, maxRecv, maxRecv)
schedState.addFF("receiver", nil, recvRSS, nil, par, nil, receiveRSS, inIndexNumber, &par.stats)
schedState.addFF("receiverPort"+string(portId), nil, recvRSS, nil, par, nil, receiveRSS, inIndexNumber, &par.stats)
}

type receiveOSParameters struct {
Expand Down Expand Up @@ -256,18 +256,23 @@ func addFastGenerator(out low.Rings, generateFunction GenerateFunction,
}

type sendParameters struct {
in low.Rings
port uint16
anyway bool
stats common.RXTXStats
in low.Rings
port uint16
unrestrictedClones bool
stats common.RXTXStats
sendThreadIndex int
}

func addSender(port uint16, in low.Rings, inIndexNumber int32) {
par := new(sendParameters)
par.port = port
par.in = in
par.anyway = schedState.anyway
schedState.addFF("sender", nil, send, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats)
for iii := 0; iii < sendCPUCoresPerPort; iii++ {
par := new(sendParameters)
par.port = port
par.in = in
par.unrestrictedClones = schedState.unrestrictedClones
par.sendThreadIndex = iii
schedState.addFF("senderPort"+string(port)+"Thread"+string(iii),
nil, send, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats)
}
}

type sendOSParameters struct {
Expand Down Expand Up @@ -468,6 +473,7 @@ var sizeMultiplier uint
var schedTime uint
var hwtxchecksum, hwrxpacketstimestamp, setSIGINTHandler bool
var maxRecv int
var sendCPUCoresPerPort, tXQueuesNumberPerPort int

type port struct {
wasRequested bool // has user requested any send/receive operations at this port
Expand Down Expand Up @@ -531,7 +537,7 @@ type Config struct {
// Limits parallel instances. 1 for one instance, 1000 for RSS count determine instances
MaxInIndex int32
// Scheduler should clone functions even if it can lead to reordering.
// This option should be switch off for all high level reassembling like TCP or HTTP
// This option should be switched off for all high level reassembling like TCP or HTTP
RestrictedCloning bool
// If application uses EncapsulateHead or DecapsulateHead functions L2 pointers
// should be reinit every receving or generating a packet. This can be removed if
Expand Down Expand Up @@ -570,6 +576,19 @@ type Config struct {
// SystemStartScheduler waits for SIGINT notification and calls
// SystemStop after it. It is enabled by default.
NoSetSIGINTHandler bool
// Number of CPU cores to be occupied by Send routines. It is
// necessary to set TXQueuesNumberPerPort to a reasonably big
// number which can be divided by SendCPUCoresPerPort.
SendCPUCoresPerPort int
// Number of transmit queues to use on network card. By default it
// is minimum of NIC supported TX queues number and 2. If this
// value is specified and NIC doesn't support this number of TX
// queues, initialization fails.
TXQueuesNumberPerPort int
// Controls scheduler interval in milliseconds. Default value is
// 500. Lower values allow faster reaction to changing traffic but
// increase scheduling overhead.
SchedulerInterval uint
}

// SystemInit is initialization of system. This function should be always called before graph construction.
Expand All @@ -588,12 +607,26 @@ func SystemInit(args *Config) error {
cpus = common.GetDefaultCPUs(CPUCoresNumber)
}

tXQueuesNumberPerPort = args.TXQueuesNumberPerPort
if tXQueuesNumberPerPort == 0 {
tXQueuesNumberPerPort = 2
}

sendCPUCoresPerPort = args.SendCPUCoresPerPort
if sendCPUCoresPerPort == 0 {
sendCPUCoresPerPort = 1
if tXQueuesNumberPerPort%sendCPUCoresPerPort != 0 {
return common.WrapWithNFError(nil, "TXQueuesNumberPerPort should be divisible by SendCPUCoresPerPort",
common.BadArgument)
}
}

schedulerOff := args.DisableScheduler
schedulerOffRemove := args.PersistentClones
stopDedicatedCore := args.StopOnDedicatedCore
hwtxchecksum = args.HWTXChecksum
hwrxpacketstimestamp = args.HWRXPacketsTimestamp
anyway := !args.RestrictedCloning
unrestrictedClones := !args.RestrictedCloning

mbufNumber := uint(8191)
if args.MbufNumber != 0 {
Expand All @@ -610,7 +643,12 @@ func SystemInit(args *Config) error {
sizeMultiplier = args.RingSize
}

schedTime = 500
if args.SchedulerInterval != 0 {
schedTime = args.SchedulerInterval
} else {
schedTime = 500
}

if args.ScaleTime != 0 {
schedTime = args.ScaleTime
}
Expand Down Expand Up @@ -702,7 +740,7 @@ func SystemInit(args *Config) error {
common.LogTitle(common.Initialization, "------------***------ Initializing scheduler -----***------------")
StopRing := low.CreateRings(burstSize*sizeMultiplier, maxInIndex /* Maximum possible rings */)
common.LogDebug(common.Initialization, "Scheduler can use cores:", cpus)
schedState = newScheduler(cpus, schedulerOff, schedulerOffRemove, stopDedicatedCore, StopRing, checkTime, debugTime, maxPacketsToClone, maxRecv, anyway)
schedState = newScheduler(cpus, schedulerOff, schedulerOffRemove, stopDedicatedCore, StopRing, checkTime, debugTime, maxPacketsToClone, maxRecv, unrestrictedClones)

// Set HW offloading flag in packet package
packet.SetHWTXChecksumFlag(hwtxchecksum)
Expand Down Expand Up @@ -737,7 +775,7 @@ func SystemInitPortsAndMemory() error {
for i := range createdPorts {
if createdPorts[i].wasRequested {
if err := low.CreatePort(createdPorts[i].port, createdPorts[i].willReceive,
true, hwtxchecksum, hwrxpacketstimestamp, createdPorts[i].InIndex); err != nil {
true, hwtxchecksum, hwrxpacketstimestamp, createdPorts[i].InIndex, tXQueuesNumberPerPort); err != nil {
return err
}
}
Expand Down Expand Up @@ -1690,7 +1728,8 @@ func pcopy(parameters interface{}, inIndex []int32, stopper [2]chan int, report

func send(parameters interface{}, inIndex []int32, flag *int32, coreID int) {
srp := parameters.(*sendParameters)
low.Send(srp.port, srp.in, srp.anyway, flag, coreID, &srp.stats)
low.Send(srp.port, srp.in, srp.unrestrictedClones, flag, coreID, &srp.stats,
srp.sendThreadIndex, sendCPUCoresPerPort)
}

func sendOS(parameters interface{}, inIndex []int32, flag *int32, coreID int) {
Expand Down
54 changes: 31 additions & 23 deletions flow/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const generatePauseStep = 0.1
const process = 1
const stopRequest = 2
const wasStopped = 9
const printPortStatistics = false

// TODO "5" and "39" constants derived empirically. Need to investigate more elegant thresholds.
const RSSCloneMin = 5
Expand Down Expand Up @@ -156,26 +157,26 @@ func (scheduler *scheduler) addFF(name string, ucfn uncloneFlowFunction, Cfn cFl
}

type scheduler struct {
ff []*flowFunction
cores []core
off bool
offRemove bool
anyway bool
stopDedicatedCore bool
StopRing low.Rings
usedCores uint8
checkTime uint
debugTime uint
Dropped uint
maxPacketsToClone uint32
stopFlag int32
maxRecv int
Timers []*Timer
nAttempts []uint64
pAttempts []uint64
maxInIndex int32
measureRings low.Rings
coreIndex int
ff []*flowFunction
cores []core
off bool
offRemove bool
unrestrictedClones bool
stopDedicatedCore bool
StopRing low.Rings
usedCores uint8
checkTime uint
debugTime uint
Dropped uint
maxPacketsToClone uint32
stopFlag int32
maxRecv int
Timers []*Timer
nAttempts []uint64
pAttempts []uint64
maxInIndex int32
measureRings low.Rings
coreIndex int
}

type core struct {
Expand All @@ -184,7 +185,7 @@ type core struct {
}

func newScheduler(cpus []int, schedulerOff bool, schedulerOffRemove bool, stopDedicatedCore bool,
stopRing low.Rings, checkTime uint, debugTime uint, maxPacketsToClone uint32, maxRecv int, anyway bool) *scheduler {
stopRing low.Rings, checkTime uint, debugTime uint, maxPacketsToClone uint32, maxRecv int, unrestrictedClones bool) *scheduler {
coresNumber := len(cpus)
// Init scheduler
scheduler := new(scheduler)
Expand All @@ -200,7 +201,7 @@ func newScheduler(cpus []int, schedulerOff bool, schedulerOffRemove bool, stopDe
scheduler.debugTime = debugTime
scheduler.maxPacketsToClone = maxPacketsToClone
scheduler.maxRecv = maxRecv
scheduler.anyway = anyway
scheduler.unrestrictedClones = unrestrictedClones
scheduler.pAttempts = make([]uint64, len(scheduler.cores), len(scheduler.cores))

return scheduler
Expand Down Expand Up @@ -405,6 +406,13 @@ func (scheduler *scheduler) schedule(schedTime uint) {
common.LogDebug(common.Debug, "---------------")
common.LogDebug(common.Debug, "System is using", scheduler.usedCores, "cores now.", uint8(len(scheduler.cores))-scheduler.usedCores, "cores are left available.")
low.Statistics(float32(scheduler.debugTime) / 1000)
if printPortStatistics {
for i := range createdPorts {
if createdPorts[i].wasRequested {
low.PortStatistics(createdPorts[i].port)
}
}
}
for i := range scheduler.ff {
scheduler.ff[i].printDebug(schedTime)
}
Expand Down Expand Up @@ -549,7 +557,7 @@ func (scheduler *scheduler) schedule(schedTime uint) {
ffi.removed = false
continue
}
if ffi.inIndex[0] == 1 && scheduler.anyway && ffi.checkInputRingClonable(scheduler.maxPacketsToClone) &&
if ffi.inIndex[0] == 1 && scheduler.unrestrictedClones && ffi.checkInputRingClonable(scheduler.maxPacketsToClone) &&
ffi.checkOutputRingClonable(scheduler.maxPacketsToClone) &&
(ffi.increasedSpeed == 0 || ffi.increasedSpeed > ffi.reportedState.V.Packets) {
if scheduler.pAttempts[ffi.cloneNumber+1] == 0 {
Expand Down
29 changes: 23 additions & 6 deletions internal/low/low.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,19 @@ func SrKNI(port uint16, flag *int32, coreID int, recv bool, OUT Rings, send bool
}

// Send - dequeue packets and send.
func Send(port uint16, IN Rings, anyway bool, flag *int32, coreID int, stats *common.RXTXStats) {
func Send(port uint16, IN Rings, unrestrictedClones bool, flag *int32, coreID int, stats *common.RXTXStats,
sendThreadIndex, totalSendTreads int) {
if C.rte_eth_dev_socket_id(C.uint16_t(port)) != C.int(C.rte_lcore_to_socket_id(C.uint(coreID))) {
common.LogWarning(common.Initialization, "Send port", port, "is on remote NUMA node to polling thread - not optimal performance.")
}
C.nff_go_send(C.uint16_t(port), C.extractDPDKRings((**C.struct_nff_go_ring)(unsafe.Pointer(&(IN[0]))), C.int32_t(len(IN))), C.int32_t(len(IN)),
C.bool(anyway), (*C.int)(unsafe.Pointer(flag)), C.int(coreID), (*C.RXTXStats)(unsafe.Pointer(stats)))
C.nff_go_send(C.uint16_t(port),
C.extractDPDKRings((**C.struct_nff_go_ring)(unsafe.Pointer(&(IN[0]))), C.int32_t(len(IN))),
C.int32_t(len(IN)),
C.bool(unrestrictedClones),
(*C.int)(unsafe.Pointer(flag)), C.int(coreID),
(*C.RXTXStats)(unsafe.Pointer(stats)),
C.int32_t(sendThreadIndex),
C.int32_t(totalSendTreads))
}

// Stop - dequeue and free packets.
Expand Down Expand Up @@ -590,11 +597,16 @@ func GetPortsNumber() int {
}

func CheckPortRSS(port uint16) int32 {
return int32(C.check_port_rss(C.uint16_t(port)))
return int32(C.check_max_port_rx_queues(C.uint16_t(port)))
}

func CheckPortMaxTXQueues(port uint16) int32 {
return int32(C.check_max_port_tx_queues(C.uint16_t(port)))
}

// CreatePort initializes a new port using global settings and parameters.
func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum, hwrxpacketstimestamp bool, inIndex int32) error {
func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum,
hwrxpacketstimestamp bool, inIndex int32, tXQueuesNumberPerPort int) error {
var mempools **C.struct_rte_mempool
if willReceive {
m := CreateMempools("receive", inIndex)
Expand All @@ -603,7 +615,7 @@ func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum, h
mempools = nil
}
if C.port_init(C.uint16_t(port), C.bool(willReceive), mempools,
C._Bool(promiscuous), C._Bool(hwtxchecksum), C._Bool(hwrxpacketstimestamp), C.int32_t(inIndex)) != 0 {
C._Bool(promiscuous), C._Bool(hwtxchecksum), C._Bool(hwrxpacketstimestamp), C.int32_t(inIndex), C.int32_t (tXQueuesNumberPerPort)) != 0 {
msg := common.LogError(common.Initialization, "Cannot init port ", port, "!")
return common.WrapWithNFError(nil, msg, common.FailToInitPort)
}
Expand Down Expand Up @@ -697,6 +709,11 @@ func Statistics(N float32) {
C.statistics(C.float(N))
}

// PortStatistics print statistics about NIC port.
func PortStatistics(port uint16) {
C.portStatistics(C.uint16_t(port))
}

// ReportMempoolsState prints used and free space of mempools.
func ReportMempoolsState() {
for _, m := range usedMempools {
Expand Down
Loading

0 comments on commit b78cb92

Please sign in to comment.