forked from SaurabhAgarwala/DSAProject
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MerkleTree.py
237 lines (199 loc) · 8.01 KB
/
MerkleTree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
from HashFunction import *
import os
import math
import copy
class MerkleTree:
class Node:
def __init__(self, left, right, hash_val):
self.left = left
self.right = right
if left == None:
self._height = 0
else:
self._height = left.height + 1
self.hash = hash_val
self.parent = None
@property
def height(self):
return self._height
@height.setter
def height(self,height):
self._height = height
def get_sibling(self, sibling_hash):
if sibling_hash == self.left.hash:
return self.right
return self.left
def __init__(self, items):
self.is_built = False
self.root_hash = None
self.node_table = {}
self.max_height = math.ceil(math.log(len(items), 2))
self.leaves = list(map(self._leafify, map(self._md5sum, items)))
if items and len(items) > 0:
self.build_tree()
def _md5sum(self, data):
""" Returns an md5 hash of data.
If data is a file it is expected to contain its full path.
"""
data = str(data)
if os.path.isfile(data):
try:
f = open(data, 'rb')
except:
return 'ERROR: unable to open %s' % data
temp1 = ''
while True:
d = f.read(8192)
if not d:
break
#d=str(d)
#print(d)
temp=hash_fun_hex_str(hash_fun(d))
temp1=hash_update(temp1,temp)
f.close()
# Otherwise it could be either 1) a directory 2) miscellaneous data (like json)
else:
data2=bytes(data,"utf-8")
temp1=hash_fun_hex_str(hash_fun(data2))
return temp1
def _leafify(self, data):
leaf = self.Node(None, None, data)
leaf.height = 0
return leaf
def _handle_solo_node_case(self):
# The other method for building the tree will fail in a one node case, hence this method
if len(self.leaves) == 1:
solo_node = self.leaves.pop()
self.root_hash = solo_node.hash
self.node_table[solo_node.hash] = solo_node
def build_tree(self):
""" Builds a merkle tree by adding leaves one at a time to a stack,
and combining leaves in the stack when they are of the same height.
Expected items to be an array of type Node.
Also constructs node_table, a dict containing hashes that map to
individual nodes for auditing purposes.
"""
stack = []
self._handle_solo_node_case()
while self.root_hash == None:
if len(stack) >= 2 and stack[-1].height == stack[-2].height:
left = stack.pop()
right = stack.pop()
parent_hash = self._md5sum(left.hash + right.hash)
parent = self.Node(left, right, parent_hash)
self.node_table[parent_hash] = parent
left.parent = parent
right.parent = parent
if parent.height == self.max_height:
self.root_hash = parent.hash
stack.append(parent)
elif len(self.leaves) > 0:
leaf = self.leaves.pop()
self.node_table[leaf.hash] = leaf
stack.append(leaf)
# Handle case where last 2 nodes do not match in height by increasing height of last node.
else:
stack[-1].height += 1
self.is_built = True
def _get_leaf_hashes(self):
return [node.hash for node in self.node_table.values() if node.left == None]
def get_branch(self, item):
""" Returns an authentication path for an item (not hashed) in
the Merkle tree as a list in order from the top of the tree
to the bottom.
"""
hash_ = self._md5sum(item)
if not hash_ in self._get_leaf_hashes():
print("The requested item is not in the merkle tree.")
return
return self._get_branch_by_hash(hash_)
def _get_branch_by_hash(self, hash_):
""" Returns an authentication path as a list in order from the top
to the bottom of the tree (assumes preconditions have been checked).
"""
path = []
while hash_ != self.root_hash:
node = self.node_table[hash_]
parent = node.parent
sibling = parent.get_sibling(hash_)
path.append(sibling.hash)
hash_ = parent.hash
path.append(hash_)
path.reverse()
return path
def audit(self, data, proof_hashes):
""" Returns a boolean testing if a data (a file or object)
is contained in the merkle tree.
proof_hashes are the nodes to hash the hash of data with
in order from the top of the tree to the bottom
level. len(proof_hashes) is expected to be the height of the
tree, ceil(log2(n)), as one node is needed for proof per layer.
If the tree has not been built, returns False for any data.
"""
if proof_hashes:
if self.root_hash == None:
return False
hash_ = self._md5sum(data)
# A one element tree does not make much sense, but if one exists
# we simply need to check if the files hash is the correct root
if self.max_height == 0 and hash_ == self.root_hash:
return True
if self.max_height == 0 and hash_ != self.root_hash:
return False
#print('Proof Hashes', proof_hashes)
proof_hashes_cp = copy.copy(proof_hashes)
return self._audit(hash_, proof_hashes_cp)
else:
return False
def _audit(self, questioned_hash, proof_hashes):
""" Tests if questioned_hash is a member of the merkle tree by
hashing it with its test until the root hash is reached.
"""
proof_hash = proof_hashes.pop()
# print(proof_hash)
if not proof_hash in self.node_table.keys():
return False
test = self.node_table[proof_hash]
parent = test.parent
# Because the order in which the hashes are concatenated matters,
# we must test to see if questioned_hash is the "left" or "right"
# of its parent (the hash is always build as left + right).
if parent.left.hash == questioned_hash:
actual_hash = self._md5sum(questioned_hash + test.hash)
# temp_str=questioned_hash+' + '+test.hash+' = '+actual_hash
proof_to_user.append('2')
proof_to_user.append(test.hash)
elif parent.right.hash == questioned_hash:
actual_hash = self._md5sum(test.hash + questioned_hash)
# temp_str=test.hash+' + '+questioned_hash+' = '+actual_hash
proof_to_user.append('1')
proof_to_user.append(test.hash)
else:
return False
if actual_hash != parent.hash:
return False
if actual_hash == self.root_hash:
return True
return self._audit(actual_hash, proof_hashes)
def p2pFilesVerification(self,root_h,l):
m1=MerkleTree(l)
if m1.root_hash == root_h:
print("The files are authentic")
else:
print('Delete the files immediately, those are corrupted')
'''a=MerkleTree(['try.txt','try1.txt'])
print("root",a.root_hash)
p=a.node_table[a.root_hash].right
print("try",p.hash)
print(a.audit('try1.txt',a.get_branch('try1.txt')))
print(a.audit(17,a.get_branch(17)))
a.proof(a.root_hash,['try.txt','try2.txt'])
'''
proof_to_user=[]
a=MerkleTree(['testing_update.txt'])
print(a.root_hash)
# print(a.audit('try1.txt',a.get_branch('try1.txt')))
# f = open("proof_to_user1.txt","w")
# for i in range(len(proof_to_user)):
# f.write(str(proof_to_user[i]+'\n'))
# f.close()