From dfcbd5d2dbc93d9f92352d1c8d61fc8184bb6f85 Mon Sep 17 00:00:00 2001 From: Arrrrr <11490236+arrrrrrrrrrrrrrrrrr@users.noreply.github.com> Date: Thu, 14 Nov 2024 23:02:32 -0800 Subject: [PATCH 1/4] Add files via upload --- ...00_c99239e3445f_add_pause_functionality.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py diff --git a/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py b/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py new file mode 100644 index 00000000..2b81d46e --- /dev/null +++ b/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py @@ -0,0 +1,39 @@ +"""add_pause_functionality + +Revision ID: [generate a new revision ID] +Revises: c99709e3648f +Create Date: 2024-11-14 16:00:00.000000 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = '[generate a new revision ID]' +down_revision: Union[str, None] = 'c99709e3648f' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Add pause-related columns to MediaItem table + op.add_column('MediaItem', + sa.Column('is_paused', sa.Boolean(), nullable=True, default=False)) + op.add_column('MediaItem', + sa.Column('paused_at', sa.DateTime(), nullable=True)) + op.add_column('MediaItem', + sa.Column('paused_by', sa.String(), nullable=True)) + + # Add index for is_paused column + op.create_index(op.f('ix_mediaitem_is_paused'), 'MediaItem', ['is_paused']) + + +def downgrade() -> None: + # Remove pause-related columns from MediaItem table + op.drop_index(op.f('ix_mediaitem_is_paused'), table_name='MediaItem') + op.drop_column('MediaItem', 'paused_by') + op.drop_column('MediaItem', 'paused_at') + op.drop_column('MediaItem', 'is_paused') \ No newline at end of file From 3db969e1d26ae0886a92d02f4d26a86b5f14dcb6 Mon Sep 17 00:00:00 2001 From: Arrrrr <11490236+arrrrrrrrrrrrrrrrrr@users.noreply.github.com> Date: Thu, 14 Nov 2024 23:03:36 -0800 Subject: [PATCH 2/4] Add files via upload --- src/program/state_transition.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/program/state_transition.py b/src/program/state_transition.py index 9ecec854..eb4a13cf 100644 --- a/src/program/state_transition.py +++ b/src/program/state_transition.py @@ -17,45 +17,62 @@ def process_event(emitted_by: Service, existing_item: MediaItem | None = None, c no_further_processing: ProcessedEvent = (None, []) items_to_submit = [] -#TODO - Reindex non-released badly indexed items here + # Skip processing if item is paused + if existing_item and existing_item.is_paused: + logger.debug(f"Skipping {existing_item.log_string} - item is paused") + return no_further_processing + + # Process new content items or requested items if content_item or (existing_item is not None and existing_item.last_state == States.Requested): next_service = TraktIndexer logger.debug(f"Submitting {content_item.log_string if content_item else existing_item.log_string} to trakt indexer") return next_service, [content_item or existing_item] + # Process partially completed or ongoing items elif existing_item is not None and existing_item.last_state in [States.PartiallyCompleted, States.Ongoing]: if existing_item.type == "show": for season in existing_item.seasons: - if season.last_state not in [States.Completed, States.Unreleased]: + # Skip paused seasons + if not season.is_paused and season.last_state not in [States.Completed, States.Unreleased]: _, sub_items = process_event(emitted_by, season, None) items_to_submit += sub_items elif existing_item.type == "season": for episode in existing_item.episodes: - if episode.last_state != States.Completed: + # Skip paused episodes + if not episode.is_paused and episode.last_state != States.Completed: _, sub_items = process_event(emitted_by, episode, None) items_to_submit += sub_items + # Process indexed items elif existing_item is not None and existing_item.last_state == States.Indexed: next_service = Scraping if emitted_by != Scraping and Scraping.should_submit(existing_item): items_to_submit = [existing_item] elif existing_item.type == "show": - items_to_submit = [s for s in existing_item.seasons if s.last_state != States.Completed and Scraping.should_submit(s)] + # Filter out paused seasons + items_to_submit = [s for s in existing_item.seasons + if not s.is_paused and s.last_state != States.Completed and Scraping.should_submit(s)] elif existing_item.type == "season": - items_to_submit = [e for e in existing_item.episodes if e.last_state != States.Completed and Scraping.should_submit(e)] + # Filter out paused episodes + items_to_submit = [e for e in existing_item.episodes + if not e.is_paused and e.last_state != States.Completed and Scraping.should_submit(e)] + # Process scraped items elif existing_item is not None and existing_item.last_state == States.Scraped: next_service = Downloader items_to_submit = [existing_item] + # Process downloaded items elif existing_item is not None and existing_item.last_state == States.Downloaded: next_service = Symlinker items_to_submit = [existing_item] + # Process symlinked items elif existing_item is not None and existing_item.last_state == States.Symlinked: next_service = Updater items_to_submit = [existing_item] + # Process completed items elif existing_item is not None and existing_item.last_state == States.Completed: # If a user manually retries an item, lets not notify them again if emitted_by not in ["RetryItem", PostProcessing]: @@ -73,11 +90,6 @@ def process_event(emitted_by: Service, existing_item: MediaItem | None = None, c if not items_to_submit: return no_further_processing else: - return no_further_processing - # if items_to_submit and next_service: - # for item in items_to_submit: - # logger.debug(f"Submitting {item.log_string} ({item.id}) to {next_service if isinstance(next_service, str) else next_service.__name__}") - return next_service, items_to_submit From d3e3d3ed481f55a6d8be6c6ffc6497805f27b426 Mon Sep 17 00:00:00 2001 From: Arrrrr <11490236+arrrrrrrrrrrrrrrrrr@users.noreply.github.com> Date: Thu, 14 Nov 2024 23:04:30 -0800 Subject: [PATCH 3/4] Add files via upload --- src/program/media/item.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/program/media/item.py b/src/program/media/item.py index 5b081963..e6692af1 100644 --- a/src/program/media/item.py +++ b/src/program/media/item.py @@ -77,6 +77,10 @@ class MediaItem(db.Model): last_state: Mapped[Optional[States]] = mapped_column(sqlalchemy.Enum(States), default=States.Unknown) subtitles: Mapped[list[Subtitle]] = relationship(Subtitle, back_populates="parent", lazy="selectin", cascade="all, delete-orphan") + # Pause related fields + is_paused: Mapped[Optional[bool]] = mapped_column(sqlalchemy.Boolean, default=False) + paused_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) + __mapper_args__ = { "polymorphic_identity": "mediaitem", "polymorphic_on":"type", @@ -257,6 +261,8 @@ def to_dict(self): "requested_by": self.requested_by, "scraped_at": str(self.scraped_at), "scraped_times": self.scraped_times, + "is_paused": self.is_paused, + "paused_at": str(self.paused_at) if self.paused_at else None, } def to_extended_dict(self, abbreviated_children=False, with_streams=True): @@ -407,6 +413,24 @@ def _reset(self): def log_string(self): return self.title or self.id + def pause(self) -> None: + """Pause processing of this media item""" + logger.debug(f"Pausing {self.id}") + self.is_paused = True + self.paused_at = datetime.now() + logger.info(f"{self.log_string} paused, is_paused={self.is_paused}, paused_at={self.paused_at}") + + # Get the session and flush changes + session = object_session(self) + if session: + session.flush() + + def unpause(self) -> None: + """Resume processing of this media item""" + self.is_paused = False + self.paused_at = None + logger.info(f"{self.log_string} unpaused") + @property def collection(self): return self.parent.collection if self.parent else self.id From 3e9ca0eec54ef0d23af87b42071504a33bef3bdd Mon Sep 17 00:00:00 2001 From: Arrrrr <11490236+arrrrrrrrrrrrrrrrrr@users.noreply.github.com> Date: Thu, 14 Nov 2024 23:04:56 -0800 Subject: [PATCH 4/4] Add files via upload --- src/routers/secure/items.py | 207 ++++++++++++++++++++++++++++++------ 1 file changed, 175 insertions(+), 32 deletions(-) diff --git a/src/routers/secure/items.py b/src/routers/secure/items.py index da7ead7b..f21fec83 100644 --- a/src/routers/secure/items.py +++ b/src/routers/secure/items.py @@ -12,7 +12,7 @@ from program.db import db_functions from program.db.db import db, get_db -from program.media.item import MediaItem +from program.media.item import MediaItem, MediaType # Import MediaType from item module from program.media.state import States from program.services.content import Overseerr from program.symlink import Symlinker @@ -20,10 +20,50 @@ from ..models.shared import MessageResponse +class StateResponse(BaseModel): + success: bool + states: list[str] + +class ItemsResponse(BaseModel): + success: bool + items: list[dict] + page: int + limit: int + total_items: int + total_pages: int + +class ResetResponse(BaseModel): + message: str + ids: list[str] + +class RetryResponse(BaseModel): + message: str + ids: list[str] + +class RemoveResponse(BaseModel): + message: str + ids: list[str] + +class PauseResponse(BaseModel): + """Response model for pause/unpause operations""" + message: str + ids: list[str] + +class PauseStateResponse(BaseModel): + """Response model for pause state check""" + is_paused: bool + paused_at: Optional[str] + item_id: str + title: Optional[str] + +class AllPausedResponse(BaseModel): + """Response model for getting all paused items""" + count: int + items: list[dict] + router = APIRouter( prefix="/items", tags=["items"], - responses={404: {"description": "Not found"}}, ) @@ -34,11 +74,6 @@ def handle_ids(ids: str) -> list[str]: return ids -class StateResponse(BaseModel): - success: bool - states: list[str] - - @router.get("/states", operation_id="get_states") async def get_states() -> StateResponse: return { @@ -47,15 +82,6 @@ async def get_states() -> StateResponse: } -class ItemsResponse(BaseModel): - success: bool - items: list[dict] - page: int - limit: int - total_items: int - total_pages: int - - @router.get( "", summary="Retrieve Media Items", @@ -254,11 +280,6 @@ async def get_items_by_imdb_ids(request: Request, imdb_ids: str) -> list[dict]: return [item.to_extended_dict() for item in items] -class ResetResponse(BaseModel): - message: str - ids: list[str] - - @router.post( "/reset", summary="Reset Media Items", @@ -284,11 +305,6 @@ async def reset_items(request: Request, ids: str) -> ResetResponse: return {"message": f"Reset items with id {ids}", "ids": ids} -class RetryResponse(BaseModel): - message: str - ids: list[str] - - @router.post( "/retry", summary="Retry Media Items", @@ -314,11 +330,6 @@ async def retry_items(request: Request, ids: str) -> RetryResponse: return {"message": f"Retried items with ids {ids}", "ids": ids} -class RemoveResponse(BaseModel): - message: str - ids: list[str] - - @router.delete( "/remove", summary="Remove Media Items", @@ -432,4 +443,136 @@ async def unblacklist_stream(_: Request, item_id: str, stream_id: int, db: Sessi return { "message": f"Unblacklisted stream {stream_id} for item {item_id}", - } \ No newline at end of file + } + +@router.post("/{ids}/pause", response_model=PauseResponse, operation_id="pause_items") +async def pause_items(request: Request, ids: str = None, db: Session = Depends(get_db)): + """Pause media items from being processed""" + item_ids = handle_ids(ids) + + items = db.execute( + select(MediaItem).where(MediaItem.id.in_(item_ids)) + ).unique().scalars().all() + + if not items: + raise HTTPException(status_code=404, detail="No items found") + + for item in items: + item.pause() + db.commit() + + return { + "message": f"Successfully paused {len(items)} items", + "ids": item_ids + } + +@router.post("/{ids}/unpause", response_model=PauseResponse, operation_id="unpause_items") +async def unpause_items(request: Request, ids: str = None, db: Session = Depends(get_db)): + """Unpause media items to resume processing""" + item_ids = handle_ids(ids) + + items = db.execute( + select(MediaItem).where(MediaItem.id.in_(item_ids)) + ).unique().scalars().all() + + if not items: + raise HTTPException(status_code=404, detail="No items found") + + for item in items: + item.unpause() + db.commit() + + return { + "message": f"Successfully unpaused {len(items)} items", + "ids": item_ids + } + +@router.get("/{id}/pause", response_model=PauseStateResponse, operation_id="get_pause_state") +async def get_pause_state(request: Request, id: str, db: Session = Depends(get_db)): + """Check if a media item is paused""" + item = db.execute( + select(MediaItem).where(MediaItem.id == id) + ).unique().scalar_one_or_none() + + if not item: + raise HTTPException(status_code=404, detail="Item not found") + + return { + "is_paused": item.is_paused, + "paused_at": str(item.paused_at) if item.paused_at else None, + "item_id": item.id, + "title": item.title + } + +@router.get("/paused", response_model=AllPausedResponse, operation_id="get_all_paused", responses={200: {"description": "Success, even if no items found"}}, response_model_exclude_none=True) +async def get_all_paused( + request: Request, + type: Optional[str] = None, + db: Session = Depends(get_db) +): + """Get all paused media items, optionally filtered by type""" + # Use raw SQL to verify data + sql = "SELECT id, title, type, is_paused, paused_at FROM \"MediaItem\" WHERE is_paused = true" + if type: + valid_types = [t.value for t in MediaType] + if type not in valid_types: + raise HTTPException( + status_code=400, + detail=f"Invalid type. Must be one of: {', '.join(valid_types)}" + ) + sql += f" AND type = '{type}'" + + logger.debug(f"Executing SQL: {sql}") + result = db.execute(sql) + rows = result.fetchall() + logger.debug(f"Raw SQL found {len(rows)} rows") + + # Map results to response model + items = [ + { + "id": row.id, + "title": row.title, + "type": row.type, + "paused_at": str(row.paused_at) if row.paused_at else None + } + for row in rows + ] + + return AllPausedResponse(count=len(items), items=items) + +@router.get("/paused/count", response_model=dict, operation_id="get_paused_count") +async def get_paused_count( + request: Request, + type: Optional[str] = None, + db: Session = Depends(get_db) +): + """Get total count of paused items, optionally filtered by type""" + try: + # Build query using SQLAlchemy + query = select(func.count(MediaItem.id)).where(MediaItem.is_paused.is_(True)) + + if type: + valid_types = [t.value for t in MediaType] + if type not in valid_types: + raise HTTPException( + status_code=400, + detail=f"Invalid type. Must be one of: {', '.join(valid_types)}" + ) + query = query.where(MediaItem.type == type) + + logger.debug(f"Executing count query: {str(query)}") + count = db.scalar(query) or 0 # Default to 0 if None + logger.debug(f"Found {count} paused items") + + return { + "total": count, + "type": type if type else "all" + } + except Exception as e: + logger.error(f"Error in get_paused_count: {str(e)}") + # Return 0 count on error + return { + "total": 0, + "type": type if type else "all", + "error": "Failed to get count" + } \ No newline at end of file