Skip to content

Commit

Permalink
Merge pull request #3339 from anurag6/events
Browse files Browse the repository at this point in the history
Add additional events for system monitoring
  • Loading branch information
anarkiwi authored Nov 13, 2019
2 parents 4d4e185 + 4435f18 commit f15cb17
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 38 deletions.
15 changes: 15 additions & 0 deletions clib/mininet_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class FaucetTestBase(unittest.TestCase):
LINKS_PER_HOST = 1
SOFTWARE_ONLY = False
NETNS = False
EVENT_LOGGER_TIMEOUT = 120

FPING_ARGS = FPING_ARGS
FPING_ARGS_SHORT = ' '.join((FPING_ARGS, '-i10 -p100 -t100'))
Expand Down Expand Up @@ -132,6 +133,7 @@ class FaucetTestBase(unittest.TestCase):
rand_dpids = set()
event_sock = None
faucet_config_path = None
event_log = None

def __init__(self, name, config, root_tmpdir, ports_sock, max_test_load, port_order=None):
super(FaucetTestBase, self).__init__(name)
Expand Down Expand Up @@ -223,6 +225,19 @@ def _set_vars(self):
def _set_log_level(self, name='faucet'):
self._set_var(name, 'FAUCET_LOG_LEVEL', str(self.LOG_LEVEL))

def _enable_event_log(self, timeout=None):
"""Creates a file event.log in the test folder that tracks all events sent out by faucet to the event socket"""
if not timeout:
timeout = self.EVENT_LOGGER_TIMEOUT
self.event_log = os.path.join(self.tmpdir, 'event.log')
controller = self._get_controller()
sock = self.env['faucet']['FAUCET_EVENT_SOCK']
# Relying on a timeout seems a bit brittle;
# as an alternative we might possibly use something like
# `with popen(cmd...) as proc`to clean up on exceptions
controller.cmd(mininet_test_util.timeout_cmd(
'nc -U %s > %s &' % (sock, self.event_log), timeout))

def _read_yaml(self, yaml_path):
with open(yaml_path) as yaml_file:
content = yaml.safe_load(yaml_file.read())
Expand Down
15 changes: 12 additions & 3 deletions faucet/dp.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class DP(Conf):
'lldp_beacon': {},
# Config for LLDP beacon service.
'metrics_rate_limit_sec': 0,
# Rate limit metric updates - don't update metrics if last update was less than this many seconds ago.
# Rate limit metric updates if last update was less than this many seconds ago.
'faucet_dp_mac': valve_packet.FAUCET_MAC,
# MAC address of packets sent by FAUCET, not associated with any VLAN.
'combinatorial_port_flood': False,
Expand Down Expand Up @@ -343,11 +343,13 @@ def __str__(self):
return self.name

def clone_dyn_state(self, prev_dp):
"""Clone dynamic state for this dp"""
self.dyn_running = prev_dp.dyn_running
self.dyn_up_port_nos = set(prev_dp.dyn_up_port_nos)
self.dyn_last_coldstart_time = prev_dp.dyn_last_coldstart_time

def check_config(self):
"""Check configuration of this dp"""
super(DP, self).check_config()
test_config_condition(not isinstance(self.dp_id, int), (
'dp_id must be %s not %s' % (int, type(self.dp_id))))
Expand Down Expand Up @@ -567,7 +569,8 @@ def _configure_tables(self):
scale_factor *= self.port_table_scale_factor

# Always multiple of min_wildcard_table_size
size = (int(scale_factor / self.min_wildcard_table_size) + 1) * self.min_wildcard_table_size
table_size_multiple = int(scale_factor / self.min_wildcard_table_size) + 1
size = table_size_multiple * self.min_wildcard_table_size

if not table_config.exact_match:
size = max(size, self.min_wildcard_table_size)
Expand Down Expand Up @@ -876,6 +879,11 @@ def resolve_stack_topology(self, dps, meta_dp_state):
if self.tunnel_acls:
self.finalize_tunnel_acls(dps)

def get_node_link_data(self):
"""Return network stacking graph as a node link representation"""
graph = self.stack.get('graph', None)
return networkx.json_graph.node_link_data(graph)

def stack_longest_path_to_root_len(self):
"""Return length of the longest path to root in the stack."""
if not self.stack or not self.stack_root_name:
Expand Down Expand Up @@ -987,7 +995,8 @@ def reset_refs(self, vlans=None):
self.vlans = {}
for vlan in vlans.values():
vlan.reset_ports(self.ports.values())
if vlan.get_ports() or vlan.reserved_internal_vlan or vlan.dot1x_assigned or vlan._id in router_vlans:
if (vlan.get_ports() or vlan.reserved_internal_vlan or
vlan.dot1x_assigned or vlan._id in router_vlans):
self.vlans[vlan.vid] = vlan

def resolve_port(self, port_name):
Expand Down
2 changes: 1 addition & 1 deletion faucet/faucet_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _loop(self, sock, _addr):
self.logger.info('event client connected')
while True:
event = self.event_q.get()
event_bytes = bytes('\n'.join((json.dumps(event), '')).encode('UTF-8'))
event_bytes = bytes('\n'.join((json.dumps(event, default=str), '')).encode('UTF-8'))
try:
sock.sendall(event_bytes)
except (socket.error, IOError) as err:
Expand Down
21 changes: 21 additions & 0 deletions faucet/valve.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ def _update_stack_link_state(self, ports, now, other_valves):
valve.flood_manager.update_stack_topo(port_stack_up, self.dp, port)
if stack_changes:
self.logger.info('%u stack ports changed state' % stack_changes)
notify_dps = {}
for valve in stacked_valves:
valve.update_tunnel_flowrules()
if not valve.dp.dyn_running:
Expand All @@ -615,6 +616,22 @@ def _update_stack_link_state(self, ports, now, other_valves):
ofmsgs_by_valve[valve].extend(valve.flood_manager.add_vlan(vlan))
for port in valve.dp.stack_ports:
ofmsgs_by_valve[valve].extend(valve.host_manager.del_port(port))
path_port = valve.dp.shortest_path_port(valve.dp.stack_root_name)
notify_dps.setdefault(valve.dp.name, {})['root_hop_port'] = (
path_port.number if path_port else None)

# Find the first valve with a valid stack and trigger notification.
for valve in stacked_valves:
graph = valve.dp.get_node_link_data()
if graph:
self.notify(
{'STACK_TOPO_CHANGE': {
'stack_root': valve.dp.stack_root_name,
'graph': graph,
'dps': notify_dps
}})
break

return ofmsgs_by_valve

def update_tunnel_flowrules(self):
Expand Down Expand Up @@ -912,6 +929,10 @@ def port_delete(self, port_num):

def _reset_lacp_status(self, port):
self._set_var('port_lacp_status', port.dyn_lacp_up, labels=self.dp.port_labels(port.number))
self.notify(
{'LAG_CHANGE': {
'port_no': port.number,
'status': port.dyn_lacp_up}})

def lacp_down(self, port, cold_start=False):
"""Return OpenFlow messages when LACP is down on a port."""
Expand Down
11 changes: 8 additions & 3 deletions faucet/valves_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def _apply_configs(self, new_dps, now, delete_dp):
valve = self.new_valve(new_dp)
if valve is None:
continue
self._notify({'CONFIG_CHANGE': {'restart_type': 'new'}}, dp=new_dp)
valve.update_config_metrics()
self.valves[dp_id] = valve
if delete_dp is not None:
Expand All @@ -267,16 +268,20 @@ def _send_ofmsgs_by_valve(self, ofmsgs_by_valve):
for valve, ofmsgs in ofmsgs_by_valve.items():
self.send_flows_to_dp_by_id(valve, ofmsgs)

def _notify(self, event_dict):
def _notify(self, event_dict, dp=None):
"""Send an event notification."""
self.notifier.notify(0, str(0), event_dict)
if dp:
self.notifier.notify(dp.dp_id, dp.name, event_dict)
else:
self.notifier.notify(0, str(0), event_dict)

def request_reload_configs(self, now, new_config_file, delete_dp=None):
"""Process a request to load config changes."""
if self.config_watcher.content_changed(new_config_file):
self.logger.info('configuration %s changed, analyzing differences', new_config_file)
result = self.load_configs(now, new_config_file, delete_dp=delete_dp)
self._notify({'CONFIG_CHANGE': {'success': result}})
self._notify({'CONFIG_CHANGE': {'success': result,
'dps_config': self.meta_dp_state.top_conf}})
else:
self.logger.info('configuration is unchanged, not reloading')
self.metrics.faucet_config_load_error.set(0)
Expand Down
45 changes: 14 additions & 31 deletions tests/integration/mininet_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ class FaucetUntaggedTest(FaucetTest):
# pylint: disable=invalid-name
CONFIG = CONFIG_BOILER_UNTAGGED

EVENT_LOGGER_TIMEOUT = 120 # Timeout for event logger process

def setUp(self): # pylint: disable=invalid-name
super(FaucetUntaggedTest, self).setUp()
Expand Down Expand Up @@ -155,24 +154,14 @@ def verify_events_log(self, event_log, timeout=10):

def test_untagged(self):
"""All hosts on the same untagged VLAN should have connectivity."""
event_log = os.path.join(self.tmpdir, 'event.log')
controller = self._get_controller()
sock = self.env['faucet']['FAUCET_EVENT_SOCK']
# Relying on a timeout seems a bit brittle;
# as an alternative we might possibly use something like
# `with popen(cmd...) as proc` to clean up on exceptions
controller.cmd(mininet_test_util.timeout_cmd(
'nc -U %s > %s &' % (sock, event_log), self.EVENT_LOGGER_TIMEOUT))
self._enable_event_log()
self.ping_all_when_learned()
self.flap_all_switch_ports()
self.verify_traveling_dhcp_mac()
self.gauge_smoke_test()
self.prometheus_smoke_test()
self.assertGreater(os.path.getsize(event_log), 0)
controller.cmd(
mininet_test_util.timeout_cmd(
'nc -U %s' % sock, 10))
self.verify_events_log(event_log)
self.assertGreater(os.path.getsize(self.event_log), 0)
self.verify_events_log(self.event_log)


class Faucet8021XBaseTest(FaucetTest):
Expand Down Expand Up @@ -263,8 +252,6 @@ class Faucet8021XBaseTest(FaucetTest):
nfv_intf = None
nfv_portno = None

event_log = ''

@staticmethod
def _priv_mac(host_id):
two_byte_port_num = '%04x' % host_id
Expand Down Expand Up @@ -318,13 +305,7 @@ def setUp(self):
self.nfv_pids.append(int(self.nfv_host.lastPid))
self.radius_log_path = self.start_freeradius()
self.nfv_pids.append(int(self.nfv_host.lastPid))

self.event_log = os.path.join(self.tmpdir, 'event.log')
controller = self._get_controller()
sock = self.env['faucet']['FAUCET_EVENT_SOCK']
controller.cmd(
mininet_test_util.timeout_cmd(
'nc -U %s > %s &' % (sock, self.event_log), 300))
self._enable_event_log(300)

def tearDown(self, ignore_oferrors=False):
for pid in self.nfv_pids:
Expand Down Expand Up @@ -5743,11 +5724,7 @@ class FaucetTaggedIPv4RouteTest(FaucetTaggedTest):
"""

def test_tagged(self):
event_log = os.path.join(self.tmpdir, 'event.log')
controller = self._get_controller()
sock = self.env['faucet']['FAUCET_EVENT_SOCK']
controller.cmd(mininet_test_util.timeout_cmd(
'nc -U %s > %s &' % (sock, event_log), 120))
self._enable_event_log()
host_pair = self.hosts_name_ordered()[:2]
first_host, second_host = host_pair
first_host_routed_ip = ipaddress.ip_interface('10.0.1.1/24')
Expand All @@ -5765,7 +5742,7 @@ def test_tagged(self):
self.port_map['port_4'], 'native_vlan', vid,
restart=True, cold_start=False)
self.wait_until_matching_lines_from_file(
r'.+L3_LEARN.+10.0.0.[12].+', event_log)
r'.+L3_LEARN.+10.0.0.[12].+', self.event_log)


class FaucetTaggedTargetedResolutionIPv4RouteTest(FaucetTaggedIPv4RouteTest):
Expand Down Expand Up @@ -7485,11 +7462,14 @@ def test_lacp_port_down(self):
self.wait_for_all_lacp_up()

def test_untagged(self):
"""All untagged hosts in stack topology can reach each other."""
"""All untagged hosts in stack topology can reach each other, LAG_CHANGE event emitted."""
self._enable_event_log()
for _ in range(3):
self.wait_for_all_lacp_up()
self.verify_stack_hosts()
self.flap_all_switch_ports()
# Check for presence of LAG_CHANGE event in event socket log
self.wait_until_matching_lines_from_file(r'.+LAG_CHANGE.+', self.event_log)

def test_dyn_fail(self):
"""Test lacp fail on reload with dynamic lacp status."""
Expand Down Expand Up @@ -7687,7 +7667,8 @@ def setUp(self): # pylint: disable=invalid-name
self.start_net()

def test_untagged(self):
"""Stack loop prevention works and hosts can ping each other."""
"""Stack loop prevention works and hosts can ping each other, STACK_TOPO_CHANGE event emitted."""
self._enable_event_log()
self.verify_stack_up()
self.verify_stack_has_no_loop()
self.retry_net_ping()
Expand All @@ -7701,6 +7682,8 @@ def test_untagged(self):
self.one_stack_port_down(dpid, dp_name, port)
self.retry_net_ping()
self.one_stack_port_up(dpid, dp_name, port)
# Check for presence of STACK_TOPO_CHANGE event in event socket log
self.wait_until_matching_lines_from_file(r'.+STACK_TOPO_CHANGE.+', self.event_log)


class FaucetSingleStack4RingOfDPTest(FaucetStackRingOfDPTest):
Expand Down

0 comments on commit f15cb17

Please sign in to comment.