-
Notifications
You must be signed in to change notification settings - Fork 86
/
cvecorder.py
476 lines (395 loc) · 18.9 KB
/
cvecorder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
from europi import *
from time import ticks_diff, ticks_ms, sleep_ms
from random import randint, uniform
from europi_script import EuroPiScript
import machine
import json
import gc
import os
import micropython
import framebuf
'''
CVecorder
author: Nik Ansell (github.com/gamecat69)
digital_in: Clock input
analog_in: Incoming CV
button_1: Toggle recording stop/start
button_2: Short Press: Cycle through CV recorder channels. Long Press (0.5 seconds): Clear the current bank. Long Press (2 seconds): Clear all banks.
output_1: CV record / playback
output_2: CV record / playback
output_3: CV record / playback
output_4: CV record / playback
output_5: CV record / playback
output_6: CV record / playback
'''
'''
Ideas / to do:
- Add a morph capability using k2: left reduces CV values, right increases. Don’t adjust all values at once, so odds, then evens with each slight movement
'''
# Needed if using europi_script
class CVecorder(EuroPiScript):
def __init__(self):
# Needed if using europi_script
super().__init__()
# Micropython heap fragmentation notes:
# - The pico has very limited memory and in some cases needs to be managed carefully
# - In some cases you can get a MemoryError even if there is enough free memory, this is because micropython could not find enough contiguous memory because the heap has become fragmented
# - Avoid heap fragmentation as much as possible by initializing and creating variables early, then updating. Rather than created new and destrying old variables.
# Initialize variables
self.step = 0
self.stepLength = 64
self.clockStep = 0
self.ActiveCvr = 0
self.ActiveBank = 0
self.resetTimeout = 1000
self.debug = False
self.CvIn = 0
self.bankToSave = 0
self.initTest = False
self.debugLogging = False
self.errorString = ' '
self.numCVR = 5 # Number of CV recorder channels - zero based
self.numCVRBanks = 5 # Number of CV recording channel banks - zero based
# Logging parameters
self.logFilePrefix = 'cvecorder_debug'
self.maxLogFiles = 5
self.logFileList = []
self.currentLogFile = ''
self.maxLogFileName = self.logFilePrefix + str(self.maxLogFiles) + '.log'
# rotate log files
self.rotateLog()
if self.debugLogging:
self.writeToDebugLog(f"[init] Firing up!.")
#self.CVR = [] # CV recorder channels
#self.CvRecording = [] # CV recorder flags
# Load CV Recordings from a previously stored state on disk or initialize if blank
self.loadState()
# Test routine, pick a random bank n times and save, then load the state
if self.initTest:
print(micropython.mem_info("level"))
for n in range(3000):
# Clear vars
#self.CvRecording = []
print(f"Running test: {n}")
self.ActiveBank = randint(0, self.numCVRBanks)
self.ActiveCvr = randint(0, self.numCVR)
for i in range(0, self.stepLength-1):
self.CVR[self.ActiveBank][self.ActiveCvr][i] = uniform(0.0, 9.99)
#print(f"[{self.ActiveBank}][{self.ActiveCvr}][{i}] = {self.CVR[self.ActiveBank][self.ActiveCvr][i]}")
self.bankToSave = self.ActiveBank
self.saveState()
self.loadState()
@din.handler
def dInput():
self.handleClock()
self.clockStep +=1
@din.handler_falling
def endClock():
self.handleClock()
@b1.handler
def b1Pressed():
# Set recording boolean to true and clear the recording buffer
self.CvRecording[self.ActiveCvr] = 'pending'
# Clear the array
for n in range (0, self.stepLength):
self.CVR[self.ActiveBank][self.ActiveCvr][n] = 0
# # B2 Long press
# @b2.handler_falling
# def b2PressedLong():
# # Issue: This seems to get triggered randomly sometimes when the button is not pressed! This causes all CV banks to be cleared! :(
# # Leave this commented out until the problem is isolated and resolved
# # 2000ms press clears all banks
# if ticks_diff(ticks_ms(), b2.last_pressed()) > 2000:
# self.confirmDelete('all')
# self.clearCvrs('all')
# #self.bankToSave = self.ActiveBank
# #self.saveState()
# # reverse the ActiveCvr increment caused by the initial button press
# if self.ActiveCvr > 0:
# self.ActiveCvr -= 1
# else:
# self.ActiveCvr = self.numCVR
# # 500ms press clears the active bank
# elif ticks_diff(ticks_ms(), b2.last_pressed()) > 500:
# self.confirmDelete(self.ActiveBank)
# self.clearCvrs(self.ActiveBank)
# self.bankToSave = self.ActiveBank
# self.saveState()
# if self.debugLogging:
# self.writeToDebugLog(f"[b2PressedLong] > 500 Calling saveState() for bank {self.bankToSave}.")
# # reverse the ActiveCvr increment caused by the initial button press
# if self.ActiveCvr > 0:
# self.ActiveCvr -= 1
# else:
# self.ActiveCvr = self.numCVR
# B2 short press
@b2.handler
def b2Pressed():
# Change the active recorder channel
if self.ActiveCvr < self.numCVR:
self.ActiveCvr += 1
else:
self.ActiveCvr = 0
def confirmDelete(self, bank):
# Show confirm text on screen
oled.fill(0)
if str(bank) == 'all':
oled.text('Clear ALL banks?', 0, 0, 1)
else:
oled.text(f'Clear bank {bank}?', 0, 0, 1)
oled.text('CONFIRM: Hold B1', 0, 15, 1)
oled.show()
# Wait for button 1
while b1.value() != 1:
sleep_ms(250)
def handleClock(self):
# Sample input to 2 decimal places
self.CvIn = round(ain.read_voltage(), 2)
# Start recording if pending and on first step
if self.step == 0 and self.CvRecording[self.ActiveCvr] == 'pending':
self.CvRecording[self.ActiveCvr] = 'true'
for i in range(self.numCVR+1):
# If recording, write the sampled value to the CVR list and play the voltage
if self.CvRecording[i] == 'true':
self.CVR[self.ActiveBank][self.ActiveCvr][self.step] = self.CvIn
cvs[self.ActiveCvr].voltage(self.CvIn)
else:
cvs[i].voltage(self.CVR[self.ActiveBank][i][self.step])
# Reset step number at stepLength -1 as pattern arrays are zero-based
if self.step < self.stepLength - 1:
self.step += 1
else:
# Reset step to zero , stop recording and save recording to local storage
self.step = 0
if self.CvRecording[self.ActiveCvr] == 'true':
self.CvRecording[self.ActiveCvr] = 'false'
self.bankToSave = self.ActiveBank
self.saveState()
if self.debugLogging:
self.writeToDebugLog(f"[handleClock] Calling saveState() for bank {self.bankToSave}.")
def clearCvrs(self, bank):
for b in range(self.numCVRBanks+1):
# skip bank unless 'all' is passed
if b != bank and bank != 'all':
continue
if self.initTest:
print('Clearing bank: ' + str(b))
# Set all CV values to zero
for i in range(self.numCVR+1):
for n in range (0, self.stepLength):
self.CVR[b][i][n] = 0
# Save the cleared bank to local storage
self.bankToSave = b
self.saveState()
if self.debugLogging:
self.writeToDebugLog(f"[clearCvrs] Calling saveState() for bank {self.bankToSave}.")
# Currently not used, but keeping in this script for future use
def initCvrs(self):
for b in range(self.numCVRBanks+1):
self.CVR.append([])
for i in range(self.numCVR+1):
self.CVR[b].append([])
self.CvRecording.append('false')
for n in range (0, self.stepLength):
self.CVR[b][i].append(0)
def saveState(self):
# generate output filename
outputFile = f"saved_state_{self.__class__.__qualname__}_{self.bankToSave}.txt"
# Convert each value to an int by multiplying by 100. This saves on storage and memory a little
for i in range(len(self.CVR[self.bankToSave])):
self.CVR[self.bankToSave][i] = [int(x * 100) for x in self.CVR[self.bankToSave][i]]
if self.initTest:
print('Saving state for bank: ' + str(self.bankToSave))
# Trigger garbage collection to minimize memory use
gc.collect()
# Show free memory if running a debug test
if self.initTest:
print(self.free())
# Write the value to a the state file
maxRetries = 6
attempts = 0
while attempts < maxRetries:
try:
attempts += 1
# Create json object of current CV bank
jsonState = json.dumps(self.CVR[self.bankToSave])
if self.debugLogging:
self.writeToDebugLog(f"[saveState] Saving state for bank: {str(self.bankToSave)}. Size: {len(jsonState)}")
with open(outputFile, 'w') as file:
# Attempt write data to state on disk, then break from while loop if the return (num bytes written) > 0
if file.write(jsonState) > 0:
#self.errorString = ' '
if self.debugLogging:
self.writeToDebugLog(f"[saveState] Bank {str(self.bankToSave)} saved OK")
break
except MemoryError as e:
self.errorString = 'w'
if self.initTest:
print(f'[{attempts}] Error: Memory allocation failed, retrying: {e}')
if self.debugLogging:
self.writeToDebugLog(f"[saveState] Error: Memory allocation failed, retrying: {e}")
print(micropython.mem_info("level"))
else:
pass
# Convert from int back to float
i=0
for channel in self.CVR[self.bankToSave]:
self.CVR[self.bankToSave][i] = [x / 100 for x in self.CVR[self.bankToSave][i]]
i += 1
def loadState(self):
# For each bank, check if a state file exists, then load it
# If not, initialize the bank with zeros then save it
# Potential issue......
# If for some reason the file open command fails, it will init each bank and wipe any previous recordings
# Added retries and debug code to try and capture the error if this does occur
self.CVR = [] # CV recorder channels
self.CvRecording = [] # CV recorder flags
# init cvRecording list
for i in range(self.numCVR+1):
self.CvRecording.append('false')
for b in range(self.numCVRBanks+1):
# Create a new array for the bank
self.CVR.append([])
# Check if a state file exists
fileName = f"saved_state_{self.__class__.__qualname__}_{b}.txt"
# Write the value to a the state file
maxRetries = 2
attempts = 0
while attempts < maxRetries:
try:
# save state exists for this bank, load it
with open(fileName, 'r') as file:
# read state from file into json object
jsonData = file.read()
if self.initTest:
print(f"Loading previous state for bank: {str(b)}. Size: {len(jsonData)}")
self.showLoadingScreen(str(b))
if self.debugLogging:
self.writeToDebugLog(f"[loadState] [{attempts}] Loading previous state for bank: {str(b)}. Size: {len(jsonData)}")
# populate CV recording channel with saved json data
self.CVR[b] = json.loads(jsonData)
# convert values in the list from int back to float
i=0
for channel in self.CVR[b]:
self.CVR[b][i] = [x / 100 if x > 0 else 0 for x in self.CVR[b][i]]
i += 1
# read OK, break from while loop
break
except OSError as e:
self.errorString = 'r'
if self.debugLogging:
self.writeToDebugLog(f"[loadState] [{attempts}] No state file found for bank {b}. Error: {e}")
# No state file exists, initialize the array with zeros
if self.initTest:
print('Initializing bank: ' + str(b))
for i in range(self.numCVR+1):
self.CVR[b].append([])
for n in range (0, self.stepLength):
self.CVR[b][i].append(0)
# Save the state file for faster loading on next boot
self.bankToSave = b
self.saveState()
except Exception as e:
self.errorString = 'x'
if self.debugLogging:
self.writeToDebugLog(f"[loadState] [{attempts}] Exception when attempting to open previous state file for bank {b}. {e}")
# Sleep and increment attempt counter
sleep_ms(50)
attempts += 1
# Currently not used, but keeping in this script for future use
def debugDumpCvr(self):
for b in range(self.numCVRBanks+1):
for i in range(self.numCVR+1):
print(str(b) + ':' + str(i) + ':' + str(self.CVR[b][i]))
def free(self, full=False):
#gc.collect()
F = gc.mem_free()
A = gc.mem_alloc()
T = F+A
P = '{0:.2f}%'.format(F/T*100)
if not full: return P
else : return ('Total:{0} Free:{1} ({2})'.format(T,F,P))
def main(self):
while True:
self.getCvBank()
self.updateScreen()
# If I have been running, then stopped for longer than reset_timeout, reset the steps and clock_step to 0
if self.clockStep != 0 and ticks_diff(ticks_ms(), din.last_triggered()) > self.resetTimeout:
if self.CvRecording[self.ActiveCvr] != 'true':
self.step = 0
self.clockStep = 0
def getCvBank(self):
# Read CV Bank selection from knob 1
self.ActiveBank = k1.read_position(self.numCVRBanks+1)
# Rotate log files to avoid filling up storage
def rotateLog(self):
self.logFileList = os.listdir()
# Delete the oldest allowed logfile if it exists
if self.maxLogFileName in self.logFileList:
os.remove(self.maxLogFileName)
# Rename other log files if they exist 4 becomes 5 etc
# Note: when this while loop exits self.currentLogFile is the name of the log file used by writeToDebugLog
self.logFileNum = self.maxLogFiles - 1
while self.logFileNum > 0:
self.currentLogFile = self.logFilePrefix + str(self.logFileNum) + '.log'
if self.currentLogFile in self.logFileList:
os.rename(self.currentLogFile, self.logFilePrefix + str(self.logFileNum + 1) + '.log')
self.logFileNum -= 1
def writeToDebugLog(self, msg):
try:
rtc=machine.RTC()
timestamp=rtc.datetime()
timestring="%04d-%02d-%02d %02d:%02d:%02d"%(timestamp[0:3] + timestamp[4:7])
except:
timestring='0000-00-00 00:00:00'
maxRetries = 6
attempts = 0
while attempts < maxRetries:
try:
attempts += 1
with open(self.currentLogFile, 'a') as file:
# Attempt write data to state on disk, then break from while loop if the return (num bytes written) > 0
if file.write(timestring + ' ' + msg + '\n') > 0:
#self.errorString = ''
break
except MemoryError as e:
print(f'[{attempts}] Error: Memory allocation failed, retrying: {e}')
except Exception as e:
print(f'[{attempts}] Error writing to debug log. {e}')
def showLoadingScreen(self, bank):
# push the bytearray of the Rpi logo into a 32 x 32 framebuffer, then show on the screen
buffer = bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00|?\x00\x01\x86@\x80\x01\x01\x80\x80\x01\x11\x88\x80\x01\x05\xa0\x80\x00\x83\xc1\x00\x00C\xe3\x00\x00~\xfc\x00\x00L'\x00\x00\x9c\x11\x00\x00\xbf\xfd\x00\x00\xe1\x87\x00\x01\xc1\x83\x80\x02A\x82@\x02A\x82@\x02\xc1\xc2@\x02\xf6>\xc0\x01\xfc=\x80\x01\x18\x18\x80\x01\x88\x10\x80\x00\x8c!\x00\x00\x87\xf1\x00\x00\x7f\xf6\x00\x008\x1c\x00\x00\x0c \x00\x00\x03\xc0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
fb = framebuf.FrameBuffer(buffer, 32, 32, framebuf.MONO_HLSB)
oled.fill(0)
oled.blit(fb, 0,0)
oled.text(f'Loading ({bank})...', 40, 12, 1)
oled.show()
def updateScreen(self):
# Clear the screen
oled.fill(0)
# Visualize each CV channel
lPadding = 4
# oled.fill_rect(x, y, width, height)
oled.rect(lPadding+0 , 0, int(self.CVR[self.ActiveBank][0][self.step]*4), 11, 1)
oled.rect(lPadding+42 , 0, int(self.CVR[self.ActiveBank][1][self.step]*4), 11, 1)
oled.rect(lPadding+84 , 0, int(self.CVR[self.ActiveBank][2][self.step]*4), 11, 1)
oled.rect(lPadding+0 , 12, int(self.CVR[self.ActiveBank][3][self.step]*4), 11, 1)
oled.rect(lPadding+42 , 12, int(self.CVR[self.ActiveBank][4][self.step]*4), 11, 1)
oled.rect(lPadding+84 , 12, int(self.CVR[self.ActiveBank][5][self.step]*4), 11, 1)
# Show 'Rec' if recording
if self.CvRecording[self.ActiveCvr] == 'true':
oled.text('REC', 71, 25, 1)
elif self.CvRecording[self.ActiveCvr] == 'pending':
#oled.text('. .', 71, 25, 1)
oled.text('.' + self.errorString + '.', 71, 25, 1)
else:
oled.text(' ' + self.errorString + ' ', 71, 25, 1)
# Active recording channel
oled.text(str(self.ActiveBank+1) + ':' + str(self.ActiveCvr+1), 100, 25, 1)
# Current step
oled.rect(lPadding-1, 26, 64, 6, 1)
oled.fill_rect(lPadding-1, 26, self.step, 6, 1)
oled.show()
if __name__ == '__main__':
dm = CVecorder()
dm.main()