Skip to content

Commit

Permalink
separate scheduler tasks for usb and ble
Browse files Browse the repository at this point in the history
  • Loading branch information
regicidalplutophage committed Dec 22, 2024
1 parent 3a63848 commit 911cd4a
Showing 1 changed file with 53 additions and 38 deletions.
91 changes: 53 additions & 38 deletions kmk/hid.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from storage import getmount

from kmk.keys import ConsumerKey, KeyboardKey, ModifierKey, MouseKey
from kmk.scheduler import create_task
from kmk.scheduler import cancel_task, create_task
from kmk.utils import Debug, clamp

try:
Expand Down Expand Up @@ -60,17 +60,19 @@ class AbstractHID:
report_bytes_nkro = 17
REPORT_BYTES = report_bytes_default
hid_devices = {}
connected = None
hid_ready = False

def __init__(self, **kwargs):
self._nkro = False
self._mouse = True
self._pan = False
self.watchdog()
self.start_watchdog()
self.find_devices()
self.setup_keyboard_hid()
self.setup_consumer_control()
self.setup_mouse_hid()

def _connected(self):
return False
return True

def show_debug(self):
if self._nkro:
Expand Down Expand Up @@ -147,23 +149,6 @@ def setup_mouse_hid(self):
except KeyError:
self._mouse = False

def watchdog(self):
if self.connected != self._connected():
self.connected = self._connected()
self.find_devices()
self.setup_keyboard_hid()
self.setup_consumer_control()
self.setup_mouse_hid()
self.post_watchdog()
if debug.enabled:
self.show_debug()

def post_watchdog(self):
return

def start_watchdog(self):
return

def __repr__(self):
return f'{self.__class__.__name__}(REPORT_BYTES={self.REPORT_BYTES})'

Expand Down Expand Up @@ -316,15 +301,32 @@ def __init__(self, **kwargs):
self.hid = usb_hid
self.hid_devices = self.hid.devices
super().__init__(**kwargs)
self._setup_task = self.wait_until_connected()

def test_reports(self):
if self._connected():
try:
self.hid_ready = True
self.setup_keyboard_hid()
self.setup_consumer_control()
self.setup_mouse_hid()
cancel_task(self._setup_task)
self._setup_task = None
if debug.enabled:
self.show_debug()
self.hid_ready = True
except OSError as e:
if debug.enabled:
debug(type(e), ':', e)

def wait_until_connected(self, period_ms=200):
return create_task(self.test_reports, period_ms=period_ms)

def _connected(self):
return supervisor.runtime.usb_connected

def start_watchdog(self, period_ms=200):
return create_task(self.watchdog, period_ms=period_ms)

def hid_send(self, evt):
if not self.connected:
if not self.hid_ready or not self._connected():
return

# int, can be looked up in HIDReportTypes
Expand All @@ -337,6 +339,7 @@ class BLEHID(AbstractHID):
BLE_APPEARANCE_HID_KEYBOARD = const(961)
# Hardcoded in CPy
MAX_CONNECTIONS = const(2)
ble_connected = False

def __init__(self, ble_name=str(getmount('/').label), **kwargs):
self.ble_name = ble_name
Expand All @@ -346,24 +349,36 @@ def __init__(self, ble_name=str(getmount('/').label), **kwargs):
self.hid_devices = self.hid.devices
self.hid.protocol_mode = 0 # Boot protocol
super().__init__(**kwargs)
self.start_ble_monitor()

def _connected(self):
return self.ble.connected

def post_watchdog(self):
# Security-wise this is not right. While you're away someone turns
# on your keyboard and they can pair with it nice and clean and then
# listen to keystrokes.
# On the other hand we don't have LESC so it's like shouting your
# keystrokes in the air
if not self.connected:
self.start_advertising()

def start_watchdog(self, period_ms=200):
return create_task(self.watchdog, period_ms=period_ms)
def ble_monitor(self):
if self.ble_connected != self._connected():
self.ble_connected = self._connected()
if self._connected():
self.find_devices()
self.hid_ready = True
if debug.enabled:
debug('BLE connected')
else:
self.hid_ready = False
# Security-wise this is not right. While you're away someone turns
# on your keyboard and they can pair with it nice and clean and then
# listen to keystrokes.
# On the other hand we don't have LESC so it's like shouting your
self.hid_ready = False
# keystrokes in the air
self.start_advertising()
if debug.enabled:
debug('BLE disconnected')

def start_ble_monitor(self, period_ms=200):
return create_task(self.setup, period_ms=period_ms)

def hid_send(self, evt):
if not self.ble.connected:
if not self.hid_ready or not self._connected():
return

# int, can be looked up in HIDReportTypes
Expand Down

0 comments on commit 911cd4a

Please sign in to comment.