Skip to content

Commit

Permalink
Added use case config, fixed bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
berticus2016 committed Dec 4, 2022
1 parent dd83f29 commit 159b3b0
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 69 deletions.
28 changes: 20 additions & 8 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ def onLost(interface):
username = config["username"] if "username" in config else None
password = config["password"] if "password" in config else None

logger.info(f"Connected to MQTT {config['name']}")

if client_id:
mqttc = mqtt.Client(client_id)
else:
Expand All @@ -128,25 +130,27 @@ def onLost(interface):
if username and password:
mqttc.username_pw_set(username, password)

mqtt_servers[config["name"]] = mqttc

def on_connect(mqttc, obj, flags, rc):
logger.debug(f"Connected to MQTT {config['name']}")

def on_message(mqttc, obj, msg):
orig_packet = msg.payload.decode()

logger.debug(f"MQTT {config['name']}: on_message")
logger.debug(f"MQTT {config['name']}: {orig_packet}")

if "pipelines" not in config:
logger.warning(f"MQTT {config['name']}: no pipeline")
return

p = plugins["packet_filter"]
pipeline_packet = p.do_action(orig_packet)

for pipeline, pipeline_plugins in config["pipelines"].items():

packet = orig_packet
packet = pipeline_packet

logger.debug(f"MQTT {config['name']} pipeline {pipeline} started")
logger.debug(f"MQTT {config['name']} pipeline {pipeline} initiated")
if not packet:
continue

Expand Down Expand Up @@ -179,18 +183,26 @@ def on_subscribe(mqttc, obj, mid, granted_qos):
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe

mqtt_servers[config["name"]] = mqttc

import ssl

if "insecure" in config and config["insecure"]:
mqttc.tls_set(cert_reqs=ssl.CERT_NONE)
mqttc.tls_insecure_set(True)

mqttc.connect(config["server"], config["port"], 60)
try:
logger.debug(f"Connecting to MQTT {config['server']}")

mqttc.connect(config["server"], config["port"], 60)

if "topic" in config:
mqttc.subscribe(config["topic"], 0)
if "topic" in config:
mqttc.subscribe(config["topic"], 0)

mqttc.loop_start()
mqttc.loop_start()
except Exception as e:
logger.error(f"MQTT {config['name']} could not start: {e}")
pass

while True:
time.sleep(1000)
Expand Down
140 changes: 79 additions & 61 deletions plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
plugins = {}


class Plugin:
class Plugin(object):
def __init__(self) -> None:
self.logger.setLevel(logging.INFO)

def configure(self, devices, mqtt_servers, config):
self.config = config
self.devices = devices
Expand All @@ -30,25 +33,42 @@ def do_action(self, packet):
class PacketFilter(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.filter.packet")

def strip_raw(self, dict_obj):
if type(dict_obj) is not dict:
return dict_obj
def strip_raw(self, data):
if type(data) is not dict:
return data

if "raw" in data:
del data["raw"]

if "raw" in dict_obj:
del dict_obj["raw"]
for k, v in data.items():
data[k] = self.strip_raw(v)

for k, v in dict_obj.items():
dict_obj[k] = self.strip_raw(v)
return data

return dict_obj
def normalize(self, dict_obj):
"""
Packets are either a dict, string dict or string
"""
if type(dict_obj) is not dict:
try:
dict_obj = json.loads(dict_obj)
except:
dict_obj = {"decoded": {"text": dict_obj}}

return self.strip_raw(dict_obj)

def do_action(self, packet):
packet = self.strip_raw(packet)
self.logger.debug(f"Before normalization: {packet}")
packet = self.normalize(packet)

if "decoded" in packet and "payload" in packet["decoded"]:
packet["decoded"]["payload"] = base64.b64encode(
packet["decoded"]["payload"]
).decode("utf-8")
if type(packet["decoded"]["payload"]) is bytes:
text = packet["decoded"]["payload"]
packet["decoded"]["payload"] = base64.b64encode(
packet["decoded"]["payload"]
).decode("utf-8")

self.logger.debug(f"After normalization: {packet}")

return packet

Expand Down Expand Up @@ -90,15 +110,17 @@ def do_action(self, packet):
)
return None

if text and "disallow" in self.config["message"]:
matches = False
for disallow_regex in self.config["message"]["disallow"]:
if not matches and re.search(disallow_regex, text):
matches = True
if "disallow" in self.config["message"]:
matches = False
for disallow_regex in self.config["message"]["disallow"]:
if not matches and re.search(disallow_regex, text):
matches = True

if matches:
self.logger.debug(f"Dropped because it matches message disallow filter")
return None
if matches:
self.logger.debug(
f"Dropped because it matches message disallow filter"
)
return None

filters = {
"app": packet["decoded"]["portnum"],
Expand All @@ -116,7 +138,7 @@ def do_action(self, packet):
and value not in filter_val["allow"]
):
self.logger.debug(
f"Dropped because it doesn't match {filter_key} allow filter"
f"Dropped because {value} doesn't match {filter_key} allow filter"
)
return None

Expand All @@ -126,7 +148,7 @@ def do_action(self, packet):
and value in filter_val["disallow"]
):
self.logger.debug(
f"Dropped because it matches {filter_key} disallow filter"
f"Dropped because {value} matches {filter_key} disallow filter"
)
return None

Expand Down Expand Up @@ -205,13 +227,6 @@ class WebhookPlugin(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.plugin.webhook")

def do_action(self, packet):
if type(packet) is not dict:
try:
packet = json.loads(packet)
except:
self.logger.warning("Packet is not dict")
return packet

if "active" in self.config and not self.config["active"]:
return packet

Expand Down Expand Up @@ -281,9 +296,16 @@ def do_action(self, packet):

mqtt_server = self.mqtt_servers[self.config["name"]]

packet_payload = packet if type(packet) is str else json.dumps(packet)
if not mqtt_server.is_connected():
self.logger.error("Not sent, not connected")
return

packet_message = json.dumps(packet)

message = self.config["message"] if "message" in self.config else packet_payload
if "message" in self.config:
message = self.config["message"].replace("{MSG}", packet["decoded"]["text"])
else:
message = packet_message

info = mqtt_server.publish(self.config["topic"], message)
info.wait_for_publish()
Expand Down Expand Up @@ -361,48 +383,36 @@ class RadioMessagePlugin(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.plugin.send")

def do_action(self, packet):

if type(packet) is not dict:
try:
packet = json.loads(packet)
except:
self.logger.error("Packet is not a dict")
return packet

if self.config["device"] not in self.devices:
self.logger.error(f"Missing interface for device {self.config['device']}")
return packet

if "to" not in packet and "toId" not in packet:
self.logger.debug("Not a message")
return packet

# Broadcast messages or specific
if (
"node_mapping" in self.config
and packet["to"] in self.config["node_mapping"]
):
destinationId = self.config["node_mapping"][packet["to"]]
else:
destinationId = packet["to"] if "to" in packet else packet["toId"]
destinationId = None

if "to" in self.config:
destinationId = self.config["to"]
elif "toId" in self.config:
destinationId = self.config["toId"]
elif "node_mapping" in self.config and "to" in packet:
destinationId = self.config["node_mapping"][packet["to"]]
elif "to" in packet:
destinationId = packet["to"]
elif "toId" in packet:
destinationId = packet["toId"]

device_name = self.config["device"]

if device_name not in self.devices:
self.logger.warning(f"No such radio device: {device_name}")
if not destinationId:
self.logger.error("Missing 'to' property in config or packet")
return packet

device_name = self.config["device"]

device = self.devices[device_name]

self.logger.debug(f"Sending packet to Radio {device_name}")
# Not a radio packet
if "decoded" in packet and "text" in packet["decoded"] and "from" not in packet:
self.logger.debug(f"Sending text to Radio {device_name}")
device.sendText(text=packet["decoded"]["text"], destinationId=destinationId)

if "message" in self.config and self.config["message"]:
device.sendText(text=self.config["message"], destinationId=destinationId)
elif (
"lat" in self.config
and self.config["lat"] > 0
Expand All @@ -413,20 +423,28 @@ def do_action(self, packet):
lng = self.config["lng"]
altitude = self.config["alt"] if "alt" in self.config else 0

self.logger.debug(f"Sending position to Radio {device_name}")

device.sendPosition(
latitude=lat,
longitude=lng,
altitude=altitude,
destinationId=destinationId,
)
else:
elif (
"decoded" in packet
and "payload" in packet["decoded"]
and "portnum" in packet["decoded"]
):
meshPacket = mesh_pb2.MeshPacket()
meshPacket.channel = 0
meshPacket.decoded.payload = base64.b64decode(packet["decoded"]["payload"])
meshPacket.decoded.portnum = int(packet["decoded"]["portnum"])
meshPacket.decoded.portnum = packet["decoded"]["portnum"]
meshPacket.decoded.want_response = False
meshPacket.id = device._generatePacketId()

self.logger.debug(f"Sending packet to Radio {device_name}")

device._sendPacket(meshPacket=meshPacket, destinationId=destinationId)

return packet
Expand Down
25 changes: 25 additions & 0 deletions use-cases/mqtt_bridge/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
devices:
- name: radio1
tcp: 192.168.86.27
mqtt_servers:
- name: external
server: broker.hivemq.com
port: 1883
topic: meshtastic/radio-network1
pipelines:
mqtt-to-radio:
- radio_message_plugin:
device: radio1
to: "^all"
pipelines:
pipeline1:
- debugger:
log_level: debug
radio-to-mqtt:
- message_filter:
app:
allow:
- "TEXT_MESSAGE_APP"
- mqtt_plugin:
name: external
topic: meshtastic/radio-network1

0 comments on commit 159b3b0

Please sign in to comment.