Skip to content

Commit

Permalink
nff-go in Azure with failsafe
Browse files Browse the repository at this point in the history
For example:

config := flow.Config{
	DPDKArgs: []string {
		"--vdev=net_vdev_netvsc0,iface=eth1",
		"--vdev=net_vdev_netvsc1,iface=eth2",
	},
}
flow.CheckFatal(flow.SystemInit(&config))

Signed-off-by: Stefan Rinkes <[email protected]>
  • Loading branch information
darinkes committed Jan 30, 2020
1 parent 7ff09bf commit 6288b76
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 31 deletions.
51 changes: 30 additions & 21 deletions flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
)

var openFlowsNumber = uint32(0)
var createdPorts []port
var createdPorts map[uint16](*port)
var portPair map[types.IPv4Address](*port)
var schedState *scheduler
var vEach [10][vBurstSize]uint8
Expand Down Expand Up @@ -725,14 +725,19 @@ func SystemInit(args *Config) error {
return err
}
// Init Ports
createdPorts = make([]port, low.GetPortsNumber(), low.GetPortsNumber())
for i := range createdPorts {
createdPorts[i].port = uint16(i)
createdPorts = make(map[uint16](*port))
i := low.GetNextPort(0)
for i < low.RteMaxEthPorts {
createdPorts[i] = &port{}
createdPorts[i].port = i
common.LogDebug(common.Initialization, "Found Port ID:", i)

if maxInIndex > low.CheckPortRSS(createdPorts[i].port) {
createdPorts[i].InIndex = low.CheckPortRSS(createdPorts[i].port)
} else {
createdPorts[i].InIndex = maxInIndex
}
i = low.GetNextPort(i + 1)
}
portPair = make(map[types.IPv4Address](*port))
ioDevices = make(map[string]interface{})
Expand Down Expand Up @@ -886,17 +891,18 @@ func SetReceiverFile(filename string, repcount int32) (OUT *Flow) {
// Receive queue will be added to port automatically.
// Returns new opened flow with received packets
func SetReceiver(portId uint16) (OUT *Flow, err error) {
if portId >= uint16(len(createdPorts)) {
return nil, common.WrapWithNFError(nil, "Requested receive port exceeds number of ports which can be used by DPDK (bind to DPDK).", common.ReqTooManyPorts)
port, ok := createdPorts[portId]
if !ok {
return nil, common.WrapWithNFError(nil, "Requested receive port not found.", common.ReqTooManyPorts)
}
if createdPorts[portId].willReceive {
if port.willReceive {
return nil, common.WrapWithNFError(nil, "Requested receive port was already set to receive. Two receives from one port are prohibited.", common.MultipleReceivePort)
}
createdPorts[portId].wasRequested = true
createdPorts[portId].willReceive = true
rings := low.CreateRings(burstSize*sizeMultiplier, createdPorts[portId].InIndex)
addReceiver(portId, rings, createdPorts[portId].InIndex)
return newFlow(rings, createdPorts[portId].InIndex), nil
port.wasRequested = true
port.willReceive = true
rings := low.CreateRings(burstSize*sizeMultiplier, port.InIndex)
addReceiver(portId, rings, port.InIndex)
return newFlow(rings, port.InIndex), nil
}

// SetReceiverOS adds function receive from Linux interface to flow graph.
Expand Down Expand Up @@ -1045,11 +1051,14 @@ func SetSender(IN *Flow, portId uint16) error {
if err := checkFlow(IN); err != nil {
return err
}
if portId >= uint16(len(createdPorts)) {
return common.WrapWithNFError(nil, "Requested send port exceeds number of ports which can be used by DPDK (bind to DPDK).", common.ReqTooManyPorts)

port, ok := createdPorts[portId]
if !ok {
return common.WrapWithNFError(nil, "Requested send port not found.", common.ReqTooManyPorts)
}
createdPorts[portId].wasRequested = true
if createdPorts[portId].sendRings == nil {

port.wasRequested = true
if port.sendRings == nil {
// To allow consequent sends to one port, we need to create a send ring
// for the first, and then all the consequent sends should be merged
// with already created send ring.
Expand All @@ -1062,13 +1071,13 @@ func SetSender(IN *Flow, portId uint16) error {
max = createdPorts[i].InIndex
}
}
createdPorts[portId].sendRings = low.CreateRings(burstSize*sizeMultiplier, max)
addSender(portId, createdPorts[portId].sendRings, IN.inIndexNumber)
port.sendRings = low.CreateRings(burstSize*sizeMultiplier, max)
addSender(portId, port.sendRings, IN.inIndexNumber)
}
// For a typical 40 GB card, like Intel 710 series, one core should be able
// to handle all the TX without problems. So we merged all income flows to created
// ring which will be send.
mergeOneFlow(IN, createdPorts[portId].sendRings)
mergeOneFlow(IN, port.sendRings)
return nil
}

Expand Down Expand Up @@ -1312,7 +1321,7 @@ func GetNameByPort(port uint16) (string, error) {
func SetIPForPort(port uint16, ip types.IPv4Address) error {
for i := range createdPorts {
if createdPorts[i].port == port && createdPorts[i].wasRequested {
portPair[ip] = &createdPorts[i]
portPair[ip] = createdPorts[i]
return nil
}
}
Expand Down Expand Up @@ -1554,7 +1563,7 @@ func recvXDP(parameters interface{}, inIndex []int32, flag *int32, coreID int) {
func processKNI(parameters interface{}, inIndex []int32, flag *int32, coreID int) {
srk := parameters.(*KNIParameters)
if srk.linuxCore == true {
coreID = schedState.cores[createdPorts[srk.port.PortId].KNICoreIndex].id
coreID = schedState.cores[createdPorts[uint16(srk.port.PortId)].KNICoreIndex].id
}
low.SrKNI(uint16(srk.port.PortId), flag, coreID, srk.recv, srk.out, srk.send, srk.in, &srk.stats)
}
Expand Down
8 changes: 8 additions & 0 deletions internal/low/low.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,14 @@ func FreeKNI(port uint16) error {
return nil
}

const (
RteMaxEthPorts = C.RTE_MAX_ETHPORTS
)

func GetNextPort(port uint16) uint16 {
return uint16(C.rte_eth_find_next_owned_by(C.uint16_t(port), C.RTE_ETH_DEV_NO_OWNER))
}

// GetPortsNumber gets total number of available Ethernet devices.
func GetPortsNumber() int {
return int(C.rte_eth_dev_count())
Expand Down
9 changes: 0 additions & 9 deletions internal/low/low.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,6 @@ int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools,
rx_rings = 0;
}

if (port >= rte_eth_dev_count())
return -1;

struct rte_eth_conf port_conf_default = {
.rxmode = { .max_rx_pkt_len = RTE_ETHER_MAX_LEN,
.mq_mode = ETH_MQ_RX_RSS },
Expand Down Expand Up @@ -940,9 +937,6 @@ bool check_hwtxchecksum_capability(uint16_t port_id) {
DEV_TX_OFFLOAD_TCP_CKSUM;
struct rte_eth_dev_info dev_info;

if (port_id >= rte_eth_dev_count())
return false;

memset(&dev_info, 0, sizeof(dev_info));
rte_eth_dev_info_get(port_id, &dev_info);
return (dev_info.tx_offload_capa & flags) == flags;
Expand All @@ -952,9 +946,6 @@ bool check_hwrxpackets_timestamp_capability(uint16_t port_id) {
uint64_t flags = DEV_RX_OFFLOAD_TIMESTAMP;
struct rte_eth_dev_info dev_info;

if (port_id >= rte_eth_dev_count())
return false;

memset(&dev_info, 0, sizeof(dev_info));
rte_eth_dev_info_get(port_id, &dev_info);
return (dev_info.rx_offload_capa & flags) == flags;
Expand Down
2 changes: 1 addition & 1 deletion internal/low/low_mlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
package low

/*
#cgo LDFLAGS: -lrte_distributor -lrte_reorder -lrte_kni -lrte_pipeline -lrte_table -lrte_port -lrte_timer -lrte_jobstats -lrte_lpm -lrte_power -lrte_acl -lrte_meter -lrte_sched -lrte_vhost -lrte_ip_frag -lrte_cfgfile -Wl,--whole-archive -Wl,--start-group -lrte_kvargs -lrte_mbuf -lrte_hash -lrte_ethdev -lrte_mempool -lrte_ring -lrte_mempool_ring -lrte_eal -lrte_cmdline -lrte_net -lrte_bus_pci -lrte_pci -lrte_bus_vdev -lrte_timer -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio -lrte_pmd_cxgbe -lrte_pmd_enic -lrte_pmd_i40e -lrte_pmd_fm10k -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ena -lrte_pmd_ring -lrte_pmd_af_packet -lrte_pmd_null -libverbs -lmnl -lmlx4 -lmlx5 -lrte_pmd_mlx4 -lrte_pmd_mlx5 -Wl,--end-group -Wl,--no-whole-archive -lrt -lm -ldl -lnuma
#cgo LDFLAGS: -lrte_distributor -lrte_reorder -lrte_kni -lrte_pipeline -lrte_table -lrte_port -lrte_timer -lrte_jobstats -lrte_lpm -lrte_power -lrte_acl -lrte_meter -lrte_sched -lrte_vhost -lrte_ip_frag -lrte_cfgfile -Wl,--whole-archive -Wl,--start-group -lrte_kvargs -lrte_mbuf -lrte_hash -lrte_ethdev -lrte_gso -lrte_mempool -lrte_ring -lrte_mempool_ring -lrte_eal -lrte_cmdline -lrte_net -lrte_bus_pci -lrte_pci -lrte_bus_vdev -lrte_timer -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio -lrte_pmd_cxgbe -lrte_pmd_enic -lrte_pmd_i40e -lrte_pmd_fm10k -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ena -lrte_pmd_ring -lrte_pmd_af_packet -lrte_pmd_null -lrte_pmd_failsafe -lrte_pmd_tap -lrte_pmd_vdev_netvsc -lrte_bus_vmbus -lrte_pmd_netvsc -libverbs -lmnl -lmlx4 -lmlx5 -lrte_pmd_mlx4 -lrte_pmd_mlx5 -Wl,--end-group -Wl,--no-whole-archive -lrt -lm -ldl -lnuma
*/
import "C"

0 comments on commit 6288b76

Please sign in to comment.