Skip to content

Commit

Permalink
v1.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed Oct 8, 2023
1 parent 6d40ae4 commit 35cb847
Show file tree
Hide file tree
Showing 79 changed files with 1,370 additions and 18 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

This repository hosts a collection of services for the SIO (Sighthound.IO) ecosystem. Services are intended to run via the `./scripts/sh-services` script, which relies on docker-compose. However, docker-compose is not strictly necessary for configuring or using this repository.

![System architecture](docs/media/architecture.png)

The included services are as follow:
- SIO: The computer vision analytics engine.
- MCP: Media manager service, which includes a REST API and a cleaner. MCP relies on sharing media store folders with SIO service, and listens on AMQP message bus for media creation events, such as new video recording segments or event-driven jpeg images. It then provides the API access to that media (for documentation go to http://localhost:9097), and control its lifecycle.
Expand All @@ -15,8 +17,15 @@ The `./scripts/sh-services` script is a basic tool that triggers `docker-compose

In the turnkey scenario, each service is managed with an individual `docker-compose` configuration file, an optional (or autogenerated by sh-serviecs) `.env` file containing relevant environment variable, and a collection of service specific configuration file in conf subfolder. To assist in orchestrating the services collection and disjointed `docker-compose` and environment configuration, `sh-services` CLI utility was introduced.

![Folder Structure](docs/media/folders.png)

## Configuration Priority:

First, let's take a look on how the services work:

![Folder Structure](docs/media/services.png)


For example, if you have the following configuration files:
- default.env
- 0009-customer.env
Expand All @@ -33,6 +42,8 @@ The first file is given the highest priority, so it will overwrite any conflicti

This guide will help you set up SIO to point to a fake RTSP generated by live555 and start processing video.

![live555](docs/media/live555.png)

### Prerequisites (for non-dnncam devices)
On Sighthound DNNCam devices, services come preinstalled, and the device GUI interacts with it. If you are using a dnncam we suggest you rely on the GUI to configure/update services, though it's not a requirement. On other devices, you need to manually:

Expand Down
5 changes: 5 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# Release Notes

## v1.5.0
- services: Migrate services to its own services path
- README.md: Add visual documentation
- examples: Add VideoStreamsConsumer integrated deployment sample

## v1.4.2
- examples: Fixes issue with pip installation
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.4.2
v1.5.0
78 changes: 78 additions & 0 deletions deployment-examples/ClientLib/lib/AMQPListener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/env python
import pika, json
import traceback
import socket

class AMQPListener:
host = 'localhost'
exchange = 'anypipe'
routing_key = '#'
port = 5672

def __init__(self,conf):
self.queue_name = None
self.channel = None
self.connection = None
self.json_callback = None
self.host = conf.get("host", AMQPListener.host)
self.exchange = conf.get("exchange", AMQPListener.exchange)
self.routing_key = conf.get("routing_key", AMQPListener.routing_key)
self.port = conf.get("port", AMQPListener.port)
self.json_callback = lambda data: print(f"Received data {data}")

def set_callback(self, json_callback):
self.json_callback = json_callback

def connect(self):
if not self.connection:
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host,port=self.port))
self.channel = self.connection.channel()

self.channel.exchange_declare(exchange=self.exchange, exchange_type='topic', durable=True)

def get_queue_name(self):
if not self.queue_name:
self.connect()
result = self.channel.queue_declare(queue='', exclusive=True)
self.queue_name = result.method.queue
print(f"Using queue name {self.queue_name}")
return self.queue_name

def callback(self,ch, method, properties, body):
try:
data = json.loads(body)
if self.json_callback:
self.json_callback(data)
except Exception as e:
print(f"Caught exception {e} handling callback")
traceback.print_exc()

def start(self):
"""
Start the amqp listener, setting up a callback at @param json_callback
function with single argument representing a JSON payload.
"""
print(f"Starting AMQP Listener on {self.host}:{self.port}")
try:
self.connect()
except socket.gaierror as e:
traceback.print_exc()
print(f"Error connecting to AMQP host: {self.host}:{self.port}. {e}.")
print("Please check your AMQP configuration. Did you start RabbitMQ?")
print("You can also use the environment variables AMQP_HOST and AMQP_PORT to configure the host and port.")
return
except Exception as e:
traceback.print_exc()
print(f"Caught exception '{e}' connecting to AMQP")
return
queue_name = self.get_queue_name()
self.channel.queue_bind(exchange=self.exchange, queue=queue_name,
routing_key=self.routing_key)
self.channel.basic_consume(
queue=queue_name, on_message_callback=self.callback, auto_ack=True)
print(' [*] Listening for AMQP messages. To exit press CTRL+C')
self.channel.start_consuming()

def stop(self):
self.channel.stop_consuming()
162 changes: 162 additions & 0 deletions deployment-examples/ClientLib/lib/MCP.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import requests
from PIL import Image
import numpy as np
from io import BytesIO

class MCPClient:
def __init__(self, conf):
self.host = conf.get("host", "mcp")
self.port = conf.get("port", 9097)
self.user = conf.get("username", None)
self.password = conf.get("password", None)
if self.user and self.password:
print(f"Connecting to mcp://{self.user}:*****@{self.host}:{self.port}")
else:
print(f"Connecting to mcp://{self.host}:{self.port}")

def get(self, url):
if self.user and self.password:
auth = (self.user, self.password)
else:
auth = None
response = requests.get(url, auth=auth)

if response.status_code == 401:
raise Exception("Unauthorized")
else:
return response

# curl mcp:9097/hlsfs/source
def list_sources(self):
url = f"http://{self.host}:{self.port}/hlsfs/source"
return self.get(url).json()

# curl mcp:9097/hlsfs/source/<source_id>/stats
def get_stats(self, source_id):
url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/stats"
return self.get(url).json()

# curl mcp:9097/hlsfs/source/<source_id>/image/<image>
def get_image(self, source_id, image):
url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/image/{image}"
response = self.get(url)

if response.status_code != 200:
if response.status_code == 404:
raise Exception("Image not found")
else:
raise Exception("Error downloading image", response.status_code)
else:
# Convert image to numpy array
img = Image.open(BytesIO(response.content))
arr = np.array(img)
return arr

# curl mcp:9097/hlsfs/source/<source_id>/segment/<video>
def download_video(self, source_id, video, filepath):
url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/segment/{video}"
response = self.get(url)

if response.status_code != 200:
if response.status_code == 404:
raise Exception(f"Video {video} not found for source {source_id}")
else:
raise Exception(f"Error downloading video {video} to {filepath} for source {source_id}", response.status_code)
else:
# Save image to file
with open(filepath, 'wb') as f:
f.write(response.content)

# curl mcp:9097/hlsfs/source/<source_id>/segment/<segment>
def get_segment(self, source_id, segment):
url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/segment/{segment}"
response = self.get(url)

if response.status_code != 200:
if response.status_code == 404:
raise Exception("Segment not found")
else:
raise Exception("Error downloading Segment", response.status_code)
else:
return BytesIO(response.content)

# curl mcp:9097/hlsfs/source/<source_id>/segment/<image>
def download_image(self, source_id, image, filepath):
url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/segment/{image}"
response = self.get(url)

if response.status_code != 200:
if response.status_code == 404:
raise Exception("Image not found")
else:
raise Exception("Error downloading image", response.status_code)
else:
# Save image to file
with open(filepath, 'wb') as f:
f.write(response.content)

# curl mcp:9097/hlsfs/source/<source_id>/image
def list_images(self, source_id):
url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/image"
response = self.get(url)

if response.status_code != 200:
if response.status_code == 404:
raise Exception("Image not found")
else:
raise Exception("Error downloading image", response.status_code)
else:
return response.json()

# curl mcp:9097/hlsfs/source/<source_id>/latest-image
def get_latest_image(self, source_id):
url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/latest-image"
response = self.get(url)

if response.status_code != 200:
if response.status_code == 404:
raise Exception("Image not found")
else:
raise Exception("Error downloading image", response.status_code)
else:
# Convert image to numpy array
img = Image.open(BytesIO(response.content))
arr = np.array(img)
return arr

# curl mcp:9097/hlsfs/source/<source_id>/live
def get_live_m3u8(self, source_id):
url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/live"
response = self.get(url)

if response.status_code != 200:
if response.status_code == 404:
raise Exception("M3U8 not found")
else:
raise Exception("Error downloading HLS", response.status_code)
else:
return response.text

# curl mcp:9097/hlsfs/source/<source_id>/<start>..<end>.m3u8
def get_m3u8(self, source_id, start, end):
url = f"http://{self.host}:{self.port}/hlsfs/source/{source_id}/{start}..{end}.m3u8"
response = self.get(url)

if response.status_code != 200:
if response.status_code == 404:
raise Exception("M3U8 not found:", url)
else:
raise Exception("Error downloading HLS:", url, ":", response.status_code)
else:
return response.text

# curl mcp:9097/hlsfs/source/<source_id>/<start>..<end>.m3u8
def get_m3u8_playlist(self, source_id, start, end):
import m3u8
m3u8_content = self.get_m3u8(source_id, start, end)
# Remove all #EXT-UNIX-TIMESTAMP-MS lines from the m3u8 file
# m3u8 library doesn't support this tag
m3u8_content = '\n'.join([line for line in m3u8_content.split('\n') if not line.startswith("#EXT-UNIX-TIMESTAMP-MS")])
return m3u8.loads(m3u8_content)


Loading

0 comments on commit 35cb847

Please sign in to comment.