Skip to content

Commit

Permalink
Merge pull request #92 from Tastyep/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
Tastyep authored Nov 18, 2020
2 parents 56de415 + c6a5e64 commit 0ee2591
Show file tree
Hide file tree
Showing 36 changed files with 832 additions and 388 deletions.
5 changes: 5 additions & 0 deletions OpenCast.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@

ROOT="$(cd "$(dirname "$0")" && pwd)"

# Source profile file as poetry and nvm use it to modify the PATH
# This is likely to be done by the display manager, but not always (lightdm).
# shellcheck source=/dev/null
source ~/.profile

# shellcheck source=script/cli_builder.sh
source "$ROOT/script/cli_builder.sh"

Expand Down
13 changes: 7 additions & 6 deletions OpenCast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from .app.facade import AppFacade
from .app.service.module import ServiceModule
from .app.tool.json_encoder import ModelEncoder
from .config import ConfigError, config
from .config import ConfigError
from .config import config as conf
from .domain.service.factory import ServiceFactory
from .domain.service.identity import IdentityService
from .infra.data.manager import DataManager, StorageType
Expand All @@ -27,7 +28,7 @@

def run_server(logger, infra_facade):
try:
infra_facade.server.start(config["server.host"], config["server.port"])
infra_facade.server.start(conf["server.host"], conf["server.port"])
except Exception as e:
logger.error(
"Server exception caught", error=e, traceback=traceback.format_exc()
Expand Down Expand Up @@ -58,12 +59,12 @@ def main(argv=None):
init_logging(__name__)

try:
config.load_from_file("{}/config.yml".format(app_path))
conf.load_from_file("{}/config.yml".format(app_path))
except ConfigError:
return

# Get and update the log level
logging.getLogger(__name__).setLevel(config["log.level"])
logging.getLogger(__name__).setLevel(conf["log.level"])
logger = structlog.get_logger(__name__)

# TODO: make worker count configurable
Expand All @@ -77,15 +78,15 @@ def main(argv=None):
data_manager = DataManager(repo_factory)
data_facade = data_manager.connect(
StorageType.JSON,
path=config["database.file"],
path=conf["database.file"],
indent=4,
separators=(",", ": "),
cls=ModelEncoder,
)

io_factory = IoFactory()
media_factory = MediaFactory(
VlcInstance(), ThreadPoolExecutor(config["downloader.max_concurrency"])
VlcInstance(), ThreadPoolExecutor(conf["downloader.max_concurrency"])
)
infra_facade = InfraFacade(io_factory, media_factory, infra_service_factory)

Expand Down
2 changes: 1 addition & 1 deletion OpenCast/app/command/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def observe(self, cmd_cls, handler):

def dispatch(self, cmd):
def impl():
self._logger.debug(type(cmd).__name__, cmd=cmd)
self._logger.info(type(cmd).__name__, cmd=cmd)
cmd_id = id(type(cmd))
if cmd_id in self._handlers_map:
handlers = self._handlers_map[cmd_id]
Expand Down
6 changes: 6 additions & 0 deletions OpenCast/app/controller/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ def _bad_request(self, message: str = None, details: dict = None):
def _not_found(self):
return self._make_response(404, None)

def _internal_error(self, message: str = None, details: dict = None):
body = None
if message is not None:
body = {"error": {"message": message, "detail": details}}
return self._make_response(500, body)

def _make_response(self, status, body):
return self._server.make_json_response(status, body, self._model_dumps)

Expand Down
6 changes: 6 additions & 0 deletions OpenCast/app/controller/player_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ async def stream(self, req):

if self._source_service.is_playlist(source):
sources = self._source_service.unfold(source)
if not sources:
return self._internal_error("Could not unfold the playlist URL")

videos = [
Video(IdentityService.id_video(source), source) for source in sources
]
Expand All @@ -78,6 +81,9 @@ async def queue(self, req):

if self._source_service.is_playlist(source):
sources = self._source_service.unfold(source)
if not sources:
return self._internal_error("Could not unfold the playlist URL")

videos = [
Video(IdentityService.id_video(source), source) for source in sources
]
Expand Down
2 changes: 1 addition & 1 deletion OpenCast/app/service/player.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
class PlayerService(Service):
def __init__(self, app_facade, data_facade, media_factory):
logger = structlog.get_logger(__name__)
super().__init__(app_facade, logger, self, player_cmds)
super().__init__(app_facade, logger, player_cmds)

self._observe_event(PlayerEvt.PlayerCreated)

Expand Down
2 changes: 1 addition & 1 deletion OpenCast/app/service/playlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
class PlaylistService(Service):
def __init__(self, app_facade, service_factory, data_facade):
logger = structlog.get_logger(__name__)
super().__init__(app_facade, logger, self, playlist_cmds)
super().__init__(app_facade, logger, playlist_cmds)

self._observe_event(VideoEvt.VideoDeleted)

Expand Down
22 changes: 7 additions & 15 deletions OpenCast/app/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@


class Service:
def __init__(self, app_facade, logger, derived, cmd_module, evt_module=None):
def __init__(self, app_facade, logger, cmd_module, evt_module=None):
self._cmd_dispatcher = app_facade.cmd_dispatcher
self._evt_dispatcher = app_facade.evt_dispatcher
self._logger = logger
self.__derived = derived
self._observe(cmd_module, self._observe_command)
if evt_module is not None:
self._observe(evt_module, partial(self._observe_event, None))
Expand Down Expand Up @@ -45,13 +44,9 @@ def _dispatch_cmd_to_handler(self, cmd):
self._logger.error("Repo error", cmd=cmd, error=e)
try_count -= 1
except Exception as e:
self._logger.error(
"Operation error",
cmd=cmd,
error=e,
traceback=traceback.format_exc(),
self._abort_operation(
cmd.id, str(e), cmd=cmd, traceback=traceback.format_exc()
)
self._abort_operation(cmd.id, str(e))
return

def _dispatch_evt_to_handler(self, evt):
Expand All @@ -60,15 +55,12 @@ def _dispatch_evt_to_handler(self, evt):
getattr(self, handler_name)(evt)
return
except Exception as e:
self._logger.error(
"Operation error",
evt=evt,
error=e,
traceback=traceback.format_exc(),
self._abort_operation(
evt.id, str(e), evt=evt, traceback=traceback.format_exc()
)
self._abort_operation(evt.id, str(e))

def _abort_operation(self, cmd_id: Id, error: str):
def _abort_operation(self, cmd_id: Id, error: str, **logging_attrs):
self._logger.error(error, **logging_attrs)
self._evt_dispatcher.dispatch(OperationError(cmd_id, error))

def _start_transaction(self, repo, cmd_id, impl, *args):
Expand Down
8 changes: 4 additions & 4 deletions OpenCast/app/service/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
class VideoService(Service):
def __init__(self, app_facade, service_factory, data_facade, media_factory):
logger = structlog.get_logger(__name__)
super().__init__(app_facade, logger, self, video_cmds)
super().__init__(app_facade, logger, video_cmds)
self._video_repo = data_facade.video_repo
self._downloader = media_factory.make_downloader(app_facade.evt_dispatcher)
self._source_service = service_factory.make_source_service(
Expand All @@ -34,7 +34,7 @@ def impl(ctx, metadata):
metadata = self._source_service.pick_stream_metadata(cmd.source)

if metadata is None:
self._abort_operation(cmd.id, "Can't fetch metadata")
self._abort_operation(cmd.id, "Unavailable metadata", cmd=cmd)
return

self._start_transaction(self._video_repo, cmd.id, impl, metadata)
Expand Down Expand Up @@ -66,7 +66,7 @@ def stream_fetched(ctx, video, link):
if video.streamable():
link = self._source_service.fetch_stream_link(video.source)
if link is None:
self._abort_operation(cmd.id, "Could not fetch the streaming URL")
self._abort_operation(cmd.id, "Unavailable stream URL", cmd=cmd)
return

self._start_transaction(
Expand All @@ -82,7 +82,7 @@ def impl(ctx):
self._start_transaction(self._video_repo, cmd.id, impl)

def abort_operation(evt):
self._abort_operation(cmd.id, evt.error)
self._abort_operation(cmd.id, evt.error, cmd=cmd)

video.location = str(Path(cmd.output_directory) / f"{video.title}.mp4")
self._evt_dispatcher.observe_result(
Expand Down
4 changes: 2 additions & 2 deletions OpenCast/app/workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def __init__(
logger = structlog.get_logger(__name__)
super().__init__(
logger,
self,
id,
app_facade,
initial=InitWorkflow.States.INITIAL,
Expand Down Expand Up @@ -80,7 +79,8 @@ def on_enter_PURGING_VIDEOS(self, *_):
self._missing_videos = [
video.id
for video in videos
if not (video.streamable() or Path(video.location).exists())
if video.location is None
or not (video.streamable() or Path(video.location).exists())
]
if not self._missing_videos:
self.to_COMPLETED()
Expand Down
4 changes: 2 additions & 2 deletions OpenCast/app/workflow/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def on_completion(_):

with self._lock:
if self.is_running(workflow.id):
self._logger.debug("workflow already active", workflow=workflow)
self._logger.info("workflow already active", workflow=workflow)
return False

self._workflow_ids.append(workflow.id)
Expand All @@ -36,6 +36,6 @@ def on_completion(_):
times=1,
)

self._logger.debug("Starting workflow", workflow=workflow)
self._logger.info("Starting workflow", workflow=workflow)
workflow.start(*args, **kwargs)
return True
4 changes: 0 additions & 4 deletions OpenCast/app/workflow/player.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def __init__(
logger = structlog.get_logger(__name__)
super().__init__(
logger,
self,
id,
app_facade,
initial=QueueVideoWorkflow.States.INITIAL,
Expand Down Expand Up @@ -117,7 +116,6 @@ def __init__(
logger = structlog.get_logger(__name__)
super().__init__(
logger,
self,
id,
app_facade,
initial=StreamVideoWorkflow.States.INITIAL,
Expand Down Expand Up @@ -175,7 +173,6 @@ def __init__(self, id, app_facade, data_facade, video: Video):
logger = structlog.get_logger(__name__)
super().__init__(
logger,
self,
id,
app_facade,
initial=StreamVideoWorkflow.States.INITIAL,
Expand Down Expand Up @@ -247,7 +244,6 @@ def __init__(
logger = structlog.get_logger(__name__)
super().__init__(
logger,
self,
id,
app_facade,
initial=StreamVideoWorkflow.States.INITIAL,
Expand Down
31 changes: 17 additions & 14 deletions OpenCast/app/workflow/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,32 @@ class States(Enum):
CREATING = auto()
RETRIEVING = auto()
PARSING = auto()
FINALISING = auto()
SUB_RETRIEVING = auto()
COMPLETED = auto()
DELETING = auto()
ABORTED = auto()

# Trigger - Source - Dest - Conditions - Unless - Before - After - Prepare
transitions = [
["_create", States.INITIAL, States.COMPLETED, "is_complete"], # noqa: E501
["_create", States.INITIAL, States.CREATING],
["_video_created", States.CREATING, States.RETRIEVING],
["_video_retrieved", States.RETRIEVING, States.COMPLETED, "is_stream"], # noqa: E501
["_video_retrieved", States.RETRIEVING, States.PARSING],
["_video_parsed", States.PARSING, States.FINALISING],
["_video_subtitle_fetched", States.FINALISING, States.COMPLETED],

["_operation_error", States.CREATING, States.ABORTED],
["_operation_error", '*', States.DELETING],
["_video_deleted", States.DELETING, States.ABORTED],
["_create", States.INITIAL, States.COMPLETED, "is_complete"], # noqa: E501
["_create", States.INITIAL, States.CREATING],
["_video_created", States.CREATING, States.RETRIEVING],
["_video_retrieved", States.RETRIEVING, States.COMPLETED, "is_stream"], # noqa: E501
["_video_retrieved", States.RETRIEVING, States.PARSING],
["_video_parsed", States.PARSING, States.COMPLETED, "subtitle_disabled"], # noqa: E501
["_video_parsed", States.PARSING, States.SUB_RETRIEVING],
["_video_subtitle_fetched", States.SUB_RETRIEVING, States.COMPLETED],

["_operation_error", States.CREATING, States.ABORTED],
["_operation_error", '*', States.DELETING],
["_video_deleted", States.DELETING, States.ABORTED],
]
# fmt: on

def __init__(self, id, app_facade, data_facade, video: Video):
logger = structlog.get_logger(__name__)
super().__init__(
logger,
self,
id,
app_facade,
initial=VideoWorkflow.States.INITIAL,
Expand Down Expand Up @@ -90,7 +90,7 @@ def on_enter_PARSING(self, _):
self._video.id,
)

def on_enter_FINALISING(self, _):
def on_enter_SUB_RETRIEVING(self, _):
self._observe_dispatch(
VideoEvt.VideoSubtitleFetched,
Cmd.FetchVideoSubtitle,
Expand All @@ -113,3 +113,6 @@ def is_complete(self):

def is_stream(self, _):
return self._video_repo.get(self._video.id).streamable()

def subtitle_disabled(self, _):
return not config["subtitle.enabled"]
Loading

0 comments on commit 0ee2591

Please sign in to comment.