Skip to content

Commit

Permalink
ruff cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Marshall-Hallenbeck committed Sep 24, 2023
1 parent 443c363 commit 14be9d5
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 114 deletions.
28 changes: 20 additions & 8 deletions nxc/helpers/bloodhound.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable


def add_user_bh(user, domain, logger, config):
"""
Adds a user to the BloodHound graph database.
Args:
user (str or list): The username of the user or a list of user dictionaries.
domain (str): The domain of the user.
logger (Logger): The logger object for logging messages.
config (ConfigParser): The configuration object for accessing BloodHound settings.
Returns:
None
Raises:
AuthError: If the provided Neo4J credentials are not valid.
ServiceUnavailable: If Neo4J is not available on the specified URI.
Exception: If an unexpected error occurs with Neo4J.
"""
users_owned = []
if isinstance(user, str):
users_owned.append({"username": user.upper(), "domain": domain.upper()})
else:
users_owned = user

# TODO: fix this, we shouldn't be doing conditional imports
if config.get("BloodHound", "bh_enabled") != "False":
try:
from neo4j.v1 import GraphDatabase
except Exception as e:
logger.debug(f"Exception while importing neo4j.v1: {e}")
from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable

uri = f"bolt://{config.get('BloodHound', 'bh_uri')}:{config.get('BloodHound', 'bh_port')}"

driver = GraphDatabase.driver(
Expand Down
2 changes: 1 addition & 1 deletion nxc/modules/wcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def check_laps(self):

for subkey in subkeys:
value = self.reg_query_value(self.dce, self.connection, lapsv1_key_name + "\\" + subkey, "DllName")
if type(value) == str and "laps\\cse\\admpwd.dll" in value.lower():
if isinstance(value, str) and "laps\\cse\\admpwd.dll" in value.lower():
reasons.append(f"{lapsv1_key_name}\\...\\DllName matches AdmPwd.dll")
success = True
laps_path = "\\".join(value.split("\\")[1:-1])
Expand Down
2 changes: 1 addition & 1 deletion nxc/nxcdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ def do_proto(self, proto):
def help_proto():
help_string = """
proto [smb|mssql|winrm]
*unimplemented protocols: ftp, rdp, ldap, ssh
*unimplemented protocols: Ftp, rdp, ldap, ssh
Changes nxcdb to the specified protocol
"""
print_help(help_string)
Expand Down
2 changes: 1 addition & 1 deletion nxc/parsers/nmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

# right now we are only referencing the port numbers, not the service name, but this should be sufficient for 99% cases
protocol_dict = {
"ftp": {"ports": [21], "services": ["ftp"]},
"Ftp": {"ports": [21], "services": ["Ftp"]},
"ssh": {"ports": [22, 2222], "services": ["ssh"]},
"smb": {"ports": [139, 445], "services": ["netbios-ssn", "microsoft-ds"]},
"ldap": {"ports": [389, 636], "services": ["ldap", "ldaps"]},
Expand Down
19 changes: 7 additions & 12 deletions nxc/protocols/ftp.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from nxc.config import process_secret
from nxc.connection import *
from nxc.connection import connection
from nxc.helpers.logger import highlight
from nxc.logger import NXCAdapter
from ftplib import FTP, error_reply, error_temp, error_perm, error_proto
from ftplib import FTP


class ftp(connection):
class Ftp(connection):
def __init__(self, args, db, host):
self.protocol = "FTP"
self.remote_version = None
Expand Down Expand Up @@ -46,15 +48,8 @@ def create_conn_obj(self):
self.conn = FTP()
try:
self.conn.connect(host=self.host, port=self.args.port)
except error_reply:
return False
except error_temp:
return False
except error_perm:
return False
except error_proto:
return False
except socket.error:
except Exception as e:
self.logger.debug(f"Error connecting to FTP host: {e}")
return False
return True

Expand Down
2 changes: 1 addition & 1 deletion nxc/protocols/ftp/proto_args.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
def proto_args(parser, std_parser, module_parser):
ftp_parser = parser.add_parser("ftp", help="own stuff using FTP", parents=[std_parser, module_parser])
ftp_parser = parser.add_parser("Ftp", help="own stuff using FTP", parents=[std_parser, module_parser])
ftp_parser.add_argument("--port", type=int, default=21, help="FTP port (default: 21)")

cgroup = ftp_parser.add_argument_group("FTP Access", "Options for enumerating your access")
Expand Down
24 changes: 12 additions & 12 deletions nxc/protocols/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from impacket.smbconnection import SMBConnection, SessionError

from nxc.config import process_secret, host_info_colors
from nxc.connection import *
from nxc.connection import connection
from nxc.helpers.bloodhound import add_user_bh
from nxc.logger import NXCAdapter, nxc_logger
from nxc.protocols.ldap.bloodhound import BloodHound
Expand Down Expand Up @@ -285,7 +285,7 @@ def enum_host_info(self):
try:
# DC's seem to want us to logoff first, windows workstations sometimes reset the connection
self.conn.logoff()
except:
except Exception:
pass

if self.args.domain:
Expand Down Expand Up @@ -347,7 +347,7 @@ def kerberos_login(
self.nthash = nthash

if self.password == "" and self.args.asreproast:
hash_tgt = KerberosAttacks(self).getTGT_asroast(self.username)
hash_tgt = KerberosAttacks(self).get_tgt_asroast(self.username)
if hash_tgt:
self.logger.highlight(f"{hash_tgt}")
with open(self.args.asreproast, "a+") as hash_asreproast:
Expand Down Expand Up @@ -455,7 +455,7 @@ def kerberos_login(
color="magenta" if error in ldap_error_status else "red",
)
return False
except:
except Exception as e:
error_code = str(e).split()[-2][:-1]
self.logger.fail(
f"{self.domain}\\{self.username}:{self.password if not self.config.get('nxc', 'audit_mode') else self.config.get('nxc', 'audit_mode') * 8} {ldap_error_status[error_code] if error_code in ldap_error_status else ''}",
Expand All @@ -465,7 +465,7 @@ def kerberos_login(
else:
error_code = str(e).split()[-2][:-1]
self.logger.fail(
f"{self.domain}\\{self.username}{' from ccache' if useCache else ':%s' % (kerb_pass if not self.config.get('nxc', 'audit_mode') else self.config.get('nxc', 'audit_mode') * 8)} {ldap_error_status[error_code] if error_code in ldap_error_status else ''}",
f'{self.domain}\\{self.username}\' from ccache\' if useCache else \':%s\' % (kerb_pass if not self.config.get(\'nxc\', \'audit_mode\') else self.config.get(\'nxc\', \'audit_mode\') * 8)} {ldap_error_status[error_code] if error_code in ldap_error_status else ""}',
color="magenta" if error_code in ldap_error_status else "red",
)
return False
Expand All @@ -476,7 +476,7 @@ def plaintext_login(self, domain, username, password):
self.domain = domain

if self.password == "" and self.args.asreproast:
hash_tgt = KerberosAttacks(self).getTGT_asroast(self.username)
hash_tgt = KerberosAttacks(self).get_tgt_asroast(self.username)
if hash_tgt:
self.logger.highlight(f"{hash_tgt}")
with open(self.args.asreproast, "a+") as hash_asreproast:
Expand Down Expand Up @@ -528,7 +528,7 @@ def plaintext_login(self, domain, username, password):
if not self.args.local_auth:
add_user_bh(self.username, self.domain, self.logger, self.config)
return True
except:
except Exception as e:
error_code = str(e).split()[-2][:-1]
self.logger.fail(
f"{self.domain}\\{self.username}:{self.password if not self.config.get('nxc', 'audit_mode') else self.config.get('nxc', 'audit_mode') * 8} {ldap_error_status[error_code] if error_code in ldap_error_status else ''}",
Expand Down Expand Up @@ -567,7 +567,7 @@ def hash_login(self, domain, username, ntlm_hash):
self.domain = domain

if self.hash == "" and self.args.asreproast:
hash_tgt = KerberosAttacks(self).getTGT_asroast(self.username)
hash_tgt = KerberosAttacks(self).get_tgt_asroast(self.username)
if hash_tgt:
self.logger.highlight(f"{hash_tgt}")
with open(self.args.asreproast, "a+") as hash_asreproast:
Expand Down Expand Up @@ -906,7 +906,7 @@ def asreproast(self):
pass
if len(answers) > 0:
for user in answers:
hash_TGT = KerberosAttacks(self).getTGT_asroast(user[0])
hash_TGT = KerberosAttacks(self).get_tgt_asroast(user[0])
self.logger.highlight(f"{hash_TGT}")
with open(self.args.asreproast, "a+") as hash_asreproast:
hash_asreproast.write(hash_TGT + "\n")
Expand Down Expand Up @@ -993,7 +993,7 @@ def kerberoasting(self):

if len(answers) > 0:
self.logger.display(f"Total of records returned {len(answers):d}")
TGT = KerberosAttacks(self).getTGT_kerberoasting()
TGT = KerberosAttacks(self).get_tgt_kerberoasting()
dejavue = []
for (
SPN,
Expand All @@ -1017,9 +1017,9 @@ def kerberoasting(self):
self.kdcHost,
TGT["KDC_REP"],
TGT["cipher"],
TGT["sessionKey"],
TGT["session_key"],
)
r = KerberosAttacks(self).outputTGS(
r = KerberosAttacks(self).output_tgs(
tgs,
oldSessionKey,
sessionKey,
Expand Down
Loading

0 comments on commit 14be9d5

Please sign in to comment.