-
Notifications
You must be signed in to change notification settings - Fork 0
/
db_connection.py
87 lines (73 loc) · 2.53 KB
/
db_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
from pymongo.encryption_options import AutoEncryptionOpts
from pymongo.mongo_client import MongoClient
from pymongo.encryption import Algorithm, ClientEncryption
# The same master key that was used to create
# the encryption key.
path = str(os.getenv("MASTER_KEY"))
with open(path, "rb") as f:
local_master_key = f.read()
kms_providers = {
"local": {
"key": local_master_key # local_master_key variable from the previous step
},
}
# The MongoDB namespace (db.collection) used to store
# the encryption data keys.
key_vault_namespace = "encryption.__voltpass"
key_vault_db_name, key_vault_coll_name = key_vault_namespace.split(".", 1)
# bypass_auto_encryption=True disable automatic encryption but keeps
# the automatic _decryption_ behavior. bypass_auto_encryption will
# also disable spawning mongocryptd.
auto_encryption_opts = AutoEncryptionOpts(
kms_providers, key_vault_namespace, bypass_auto_encryption=True
)
# The MongoClient used to read/write application data.
client = MongoClient(
str(os.getenv("MONGO_LOCAL")),
auto_encryption_opts=auto_encryption_opts,
)
db = client.voltpass
uc = db.users
# Clear old data
uc.drop()
# Set up the key vault (key_vault_namespace) for this example.
key_vault = client[key_vault_db_name][key_vault_coll_name]
# Ensure that two data keys cannot share the same keyAltName.
key_vault.drop()
key_vault.create_index(
"keyAltNames",
unique=True,
partialFilterExpression={"keyAltNames": {"$exists": True}},
)
client_encryption = ClientEncryption(
kms_providers,
key_vault_namespace,
# The MongoClient to use for reading/writing to the key vault.
# This can be the same MongoClient used by the main application.
client,
# The CodecOptions class used for encrypting and decrypting.
# This should be the same CodecOptions instance you have configured
# on MongoClient, Database, or Collection.
uc.codec_options,
)
# Create a new data key for the encryptedField.
data_key_id = client_encryption.create_data_key(
"local", key_alt_names=["voltpass_encryption"]
)
# Explicitly encrypt a field:Deterministic
def d_encrypt(value):
encrypted_field = client_encryption.encrypt(
value,
Algorithm.AEAD_AES_256_CBC_HMAC_SHA_512_Deterministic,
key_id=data_key_id,
)
return encrypted_field
# Explicitly encrypt a field:Random
def r_encrypt(value):
encrypted_field = client_encryption.encrypt(
value,
Algorithm.AEAD_AES_256_CBC_HMAC_SHA_512_Random,
key_id=data_key_id,
)
return encrypted_field