-
Notifications
You must be signed in to change notification settings - Fork 1
/
openvpn_management.py
executable file
·176 lines (165 loc) · 7.03 KB
/
openvpn_management.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python
"""
This module interacts with the openvpn management interface
"""
import socket
import select
import sys
import os
import re
sys.dont_write_bytecode = True
class VPNmgmt(object):
"""
class vpnmgmt creates a socket to the openvpn management server
and interacts with that socket. This is just socket logic.
"""
def __init__(self, socket_path):
"""
Establish a socket for eventual use connecting to
a server at a certain socket_path
"""
if os.path.isabs(socket_path):
# It might be better to validate on "is file, is socket" but
# we do not presently use a real socket in testing, so all
# this tests is, is this an absolute-pathed filename STRING.
# The file may not even exist.
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket_path = socket_path
else:
raise ValueError('only unix sockets are currently supported')
def connect(self):
"""
Connect to the server's socket and clear out the welcome
banner that has no information of use in it.
"""
self.sock.settimeout(10.0)
self.sock.connect(self.socket_path)
# openvpn management gives a welcome message on connect.
# toss it, and go into nonblocking mode.
self.sock.recv(1024)
self.sock.settimeout(0.0)
def disconnect(self):
"""
Gracefully leave the connection if possible.
"""
try:
self._send('quit')
except socket.error:
pass
try:
self.sock.shutdown(socket.SHUT_RDWR)
except (socket.error, OSError):
pass
self.sock.close()
def _send(self, command, stopon=None):
"""
Since the interactions with openvpn management are mostly
call-and-response, this is the internal call to go and do
exactly that. Send a command, read back from the server
until it stops, or you hit something that you declare as
a stopping point. Then, return that (sometimes multiline)
string to the caller.
"""
if stopon is not None and not isinstance(stopon, bytes):
stopon = stopon.encode('utf-8')
self.sock.send(f'{command}\r\n'.encode('utf-8'))
data = b''
while True:
# keep on reading until hitting timeout, in case the server is
# being slow. stopon is used to make this faster: you don't
# need to wait for timeout if you know you already have the data.
# Be careful that stopon doesn't match the data, though.
rbuf, _wbuf, _ebuf = select.select([self.sock], [], [], 1)
buf = ''
for filedesc in rbuf:
if filedesc == self.sock:
buf = self.sock.recv(1024)
data += buf
if buf == b'' or stopon is not None and data.find(stopon) != -1:
break
return data.decode('utf-8')
@staticmethod
def _success(input_string):
"""
Indicates if the openvpn management server reports a
success (True) or failure (False) condition after
we run a command.
https://openvpn.net/community-resources/management-interface/
"""
if not isinstance(input_string, bytes):
input_string = input_string.encode('utf-8')
if input_string.startswith(b'SUCCESS'):
return True
return False
def status(self):
"""
Return the status as reported by the openvpn server.
This will return status 2 (a comma delimited format)
This is just to make parsing easier.
"""
return self._send('status 2', 'END')
def getusers(self):
"""
Returns a dict of the users connected to the VPN:
{
username: [str username, str ipv4-client-address]
}
Note that we are using the strict definition of 'connected'
as folks in the 'ROUTING_TABLE' (fully established, have a
client IP), and not the 'CLIENT_LIST' (half-established,
without a client IP). The reason for this is, there are
a lot of script kiddies who will be knocking on your front
door, and kicking them off when they're in half-established
just ends up causing noise. You should deal with them via
some sort of blocklist instead of this script. Our focus
is removing terminated users who have real connections.
"""
data = self.status()
users = {}
if re.findall('^TITLE', data):
# version 2 or 3, the first thing is a TITLE header;
# We don't need multiline here.
matched_lines = re.findall(
r'^ROUTING_TABLE[,\t].+[,\t](.+)[,\t](\d+\.\d+\.\d+\.\d+\:\d+)[,\t]',
data, re.MULTILINE)
# These DO need multiline, since data is a stream and we're
# 'abusing' ^ by anchoring to newlines in the middle
else:
# version 1 or an error condition.
matched_lines = re.findall(
r',(.+),(\d+\.\d+\.\d+\.\d+\:\d+)',
data)
for matchset in matched_lines:
username = matchset[0]
if username == 'UNDEF':
# This is subtle so needs a lot of explaining.
#
# openvpn source code, src/openvpn/multi.c
# multi_print_status calls tls_common_name and if tls_multi is NULL / not fully
# established (which can happen due to deferred authentication and races between
# auth / negotiations and status delays), the username in ROUTING_TABLE can come
# out as 'UNDEF' briefly, usually "one iteration of a status file update."
#
# Since it's a hard-coded word in the source we're replicating that here and
# ignoring any UNDEF user, since it's not really a user. This DOES mean that if
# you have a user with the certificate Common Name of literal string 'UNDEF'
# that we're going to suppress that they're connecting, but, you deserve to lose.
continue
# Pass along all the variables in matchset.
# This makes "field 1" here be "field 1" later.
users[username] = matchset
return users
def kill(self, user, commit=False):
"""
Disconnect a single user. Does not check
if they were there or not.
Returns True/False depending on if the server
reports a success or not.
"""
if commit:
ret = self._send(f'kill {user}', stopon='\r\n')
else:
# Send something useless, just to make testing
# behave a bit more like real life.
ret = self._send('version')
return (self._success(ret), ret)