diff --git a/README.md b/README.md index 9b001a1..55f7f5b 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ This repository hosts a collection of services for the SIO (Sighthound.IO) ecosystem. Services are intended to run via the `./scripts/sh-services` script, which relies on docker-compose. However, docker-compose is not strictly necessary for configuring or using this repository. +![System architecture](docs/media/architecture.png) + The included services are as follow: - SIO: The computer vision analytics engine. - MCP: Media manager service, which includes a REST API and a cleaner. MCP relies on sharing media store folders with SIO service, and listens on AMQP message bus for media creation events, such as new video recording segments or event-driven jpeg images. It then provides the API access to that media (for documentation go to http://localhost:9097), and control its lifecycle. @@ -15,8 +17,15 @@ The `./scripts/sh-services` script is a basic tool that triggers `docker-compose In the turnkey scenario, each service is managed with an individual `docker-compose` configuration file, an optional (or autogenerated by sh-serviecs) `.env` file containing relevant environment variable, and a collection of service specific configuration file in conf subfolder. To assist in orchestrating the services collection and disjointed `docker-compose` and environment configuration, `sh-services` CLI utility was introduced. +![Folder Structure](docs/media/folders.png) + ## Configuration Priority: +First, let's take a look on how the services work: + +![Folder Structure](docs/media/services.png) + + For example, if you have the following configuration files: - default.env - 0009-customer.env @@ -33,6 +42,8 @@ The first file is given the highest priority, so it will overwrite any conflicti This guide will help you set up SIO to point to a fake RTSP generated by live555 and start processing video. +![live555](docs/media/live555.png) + ### Prerequisites (for non-dnncam devices) On Sighthound DNNCam devices, services come preinstalled, and the device GUI interacts with it. If you are using a dnncam we suggest you rely on the GUI to configure/update services, though it's not a requirement. On other devices, you need to manually: diff --git a/RELEASE.md b/RELEASE.md index fe244a6..fb31973 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,9 @@ # Release Notes +## v1.5.0 +- services: Migrate services to its own services path +- README.md: Add visual documentation +- examples: Add VideoStreamsConsumer integrated deployment sample + ## v1.4.2 - examples: Fixes issue with pip installation diff --git a/VERSION b/VERSION index c432e90..2e7bd91 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.4.2 +v1.5.0 diff --git a/deployment-examples/ClientLib/lib/AMQPListener.py b/deployment-examples/ClientLib/lib/AMQPListener.py new file mode 100644 index 0000000..650588a --- /dev/null +++ b/deployment-examples/ClientLib/lib/AMQPListener.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +import pika, json +import traceback +import socket + +class AMQPListener: + host = 'localhost' + exchange = 'anypipe' + routing_key = '#' + port = 5672 + + def __init__(self,conf): + self.queue_name = None + self.channel = None + self.connection = None + self.json_callback = None + self.host = conf.get("host", AMQPListener.host) + self.exchange = conf.get("exchange", AMQPListener.exchange) + self.routing_key = conf.get("routing_key", AMQPListener.routing_key) + self.port = conf.get("port", AMQPListener.port) + self.json_callback = lambda data: print(f"Received data {data}") + + def set_callback(self, json_callback): + self.json_callback = json_callback + + def connect(self): + if not self.connection: + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host,port=self.port)) + self.channel = self.connection.channel() + + self.channel.exchange_declare(exchange=self.exchange, exchange_type='topic', durable=True) + + def get_queue_name(self): + if not self.queue_name: + self.connect() + result = self.channel.queue_declare(queue='', exclusive=True) + self.queue_name = result.method.queue + print(f"Using queue name {self.queue_name}") + return self.queue_name + + def callback(self,ch, method, properties, body): + try: + data = json.loads(body) + if self.json_callback: + self.json_callback(data) + except Exception as e: + print(f"Caught exception {e} handling callback") + traceback.print_exc() + + def start(self): + """ + Start the amqp listener, setting up a callback at @param json_callback + function with single argument representing a JSON payload. + """ + print(f"Starting AMQP Listener on {self.host}:{self.port}") + try: + self.connect() + except socket.gaierror as e: + traceback.print_exc() + print(f"Error connecting to AMQP host: {self.host}:{self.port}. {e}.") + print("Please check your AMQP configuration. Did you start RabbitMQ?") + print("You can also use the environment variables AMQP_HOST and AMQP_PORT to configure the host and port.") + return + except Exception as e: + traceback.print_exc() + print(f"Caught exception '{e}' connecting to AMQP") + return + queue_name = self.get_queue_name() + self.channel.queue_bind(exchange=self.exchange, queue=queue_name, + routing_key=self.routing_key) + self.channel.basic_consume( + queue=queue_name, on_message_callback=self.callback, auto_ack=True) + print(' [*] Listening for AMQP messages. To exit press CTRL+C') + self.channel.start_consuming() + + def stop(self): + self.channel.stop_consuming() diff --git a/deployment-examples/ClientLib/lib/MCP.py b/deployment-examples/ClientLib/lib/MCP.py new file mode 100644 index 0000000..33672ae --- /dev/null +++ b/deployment-examples/ClientLib/lib/MCP.py @@ -0,0 +1,162 @@ +import requests +from PIL import Image +import numpy as np +from io import BytesIO + +class MCPClient: + def __init__(self, conf): + self.host = conf.get("host", "mcp") + self.port = conf.get("port", 9097) + self.user = conf.get("username", None) + self.password = conf.get("password", None) + if self.user and self.password: + print(f"Connecting to mcp://{self.user}:*****@{self.host}:{self.port}") + else: + print(f"Connecting to mcp://{self.host}:{self.port}") + + def get(self, url): + if self.user and self.password: + auth = (self.user, self.password) + else: + auth = None + response = requests.get(url, auth=auth) + + if response.status_code == 401: + raise Exception("Unauthorized") + else: + return response + + # curl mcp:9097/hlsfs/source + def list_sources(self): + url = f"http://{self.host}:{self.port}/hlsfs/source" + return self.get(url).json() + + # curl mcp:9097/hlsfs/source//stats + def get_stats(self, source_id): + url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/stats" + return self.get(url).json() + + # curl mcp:9097/hlsfs/source//image/ + def get_image(self, source_id, image): + url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/image/{image}" + response = self.get(url) + + if response.status_code != 200: + if response.status_code == 404: + raise Exception("Image not found") + else: + raise Exception("Error downloading image", response.status_code) + else: + # Convert image to numpy array + img = Image.open(BytesIO(response.content)) + arr = np.array(img) + return arr + + # curl mcp:9097/hlsfs/source//segment/