Skip to content

Commit

Permalink
feat(csaf): extend csaf models and fix type hints
Browse files Browse the repository at this point in the history
RHINENG-7697
  • Loading branch information
psegedy committed Jan 19, 2024
1 parent 0cb451e commit 19167bb
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 15 deletions.
2 changes: 1 addition & 1 deletion vmaas/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class Singleton(type):
"""Singleton object."""

_instances = {}
_instances: dict = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
Expand Down
9 changes: 5 additions & 4 deletions vmaas/reposcan/database/csaf_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

from vmaas.reposcan.database.object_store import ObjectStore
from vmaas.reposcan.mnm import CSAF_FAILED_DELETE
from vmaas.reposcan.redhatcsaf.modeling import CsafFileCollection
from vmaas.reposcan.redhatcsaf.modeling import CsafData
from vmaas.reposcan.redhatcsaf.modeling import CsafFiles


class CsafStore(ObjectStore):
Expand All @@ -29,7 +30,7 @@ def delete_csaf_file(self, name: str):
finally:
cur.close()

def _save_csaf_files(self, csaf_files: CsafFileCollection) -> [int]:
def _save_csaf_files(self, csaf_files: CsafFiles) -> list[int]:
cur = self.conn.cursor()
db_ids = execute_values(
cur,
Expand All @@ -45,7 +46,7 @@ def _save_csaf_files(self, csaf_files: CsafFileCollection) -> [int]:
self.conn.commit()
return db_ids

def store(self, csaf_files: CsafFileCollection):
def store(self, csaf_data: CsafData):
"""Store collection of CSAF files into DB."""
self._save_csaf_files(csaf_files)
self._save_csaf_files(csaf_data.files)
# TODO: populate future objects in csaf store
110 changes: 100 additions & 10 deletions vmaas/reposcan/redhatcsaf/modeling.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import csv
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
from typing import Iterator
from typing import Optional
from typing import overload

import attr

Expand All @@ -33,20 +33,20 @@ def out_of_date(self) -> bool:


@attr.s(auto_attribs=True, repr=False)
class CsafFileCollection:
class CsafFiles:
"""Collection of CSAF Files."""

_files: dict[str, CsafFile] = attr.Factory(dict)

@classmethod
def from_table_map_and_csv(cls, table_map: dict[str, tuple[int, str]], csv_path: str) -> CsafFileCollection:
def from_table_map_and_csv(cls, table_map: dict[str, tuple[int, datetime]], csv_path: str) -> CsafFiles:
"""Initialize class from CsafStore.csaf_file_map table_map and changes.csv."""
obj = cls()
obj._update_from_table_map(table_map)
obj._update_from_csv(csv_path)
return obj

def _update_from_table_map(self, table_map: dict[str, tuple[int, str]]) -> None:
def _update_from_table_map(self, table_map: dict[str, tuple[int, datetime]]) -> None:
"""Update files from `CsafStore.csaf_file_map` table_map."""
for key, val in table_map.items():
id_, timestamp = val
Expand All @@ -66,11 +66,11 @@ def _update_from_csv(self, path: str) -> None:
self[obj.name] = obj

@property
def out_of_date(self) -> Generator[CsafFile, None, None]:
def out_of_date(self) -> filter[CsafFile]:
"""Filter generator of out of date files."""
return filter(lambda x: x.out_of_date, self)

def to_tuples(self, attributes: tuple[str]) -> list[tuple[str, datetime]]:
def to_tuples(self, attributes: tuple[str, ...]) -> list[tuple]:
"""Transform data to list of tuples with chosen attributes."""
res = []
for item in self:
Expand All @@ -80,27 +80,117 @@ def to_tuples(self, attributes: tuple[str]) -> list[tuple[str, datetime]]:
res.append(tuple(items))
return res

def get(self, key, default=None) -> CsafFile:
@overload
def get(self, key: str, default: None = None) -> Optional[CsafFile]:
...

@overload
def get(self, key: str, default: CsafFile) -> CsafFile:
...

def get(self, key, default=None):
"""Return the value for key if key is in the collection, else default."""
return self._files.get(key, default)

def update(self, data: CsafFiles) -> None:
"""Update data in collection - same as dict.update()."""
self._files.update(data._files) # pylint: disable=protected-access

def __getitem__(self, key: str) -> CsafFile:
return self._files[key]

def __setitem__(self, key: str, val: CsafFile) -> None:
self._files[key] = val

def __contains__(self, val: str) -> bool:
return val in self._files
def __contains__(self, key: str) -> bool:
return key in self._files

def __iter__(self) -> Iterator[CsafFile]:
return iter(self._files.values())

def __next__(self) -> CsafFile:
return next(self)
return next(iter(self))

def __repr__(self) -> str:
return repr(self._files)

def __len__(self) -> int:
return len(self._files)


@dataclass
class CsafProduct:
"""CSAF Product object."""

cpe: str
package: str
module: Optional[str] = None


@attr.s(auto_attribs=True, repr=False)
class CsafCves:
"""Collection of CSAF CVEs."""

_cves: dict[str, list[CsafProduct]] = attr.Factory(dict)

def to_tuples(self, key: str, attributes: tuple[str, ...]) -> list[tuple]:
"""Transform data to list of tuples with chosen attributes by key.
Example:
> collection = CsafCves({'key1': [CsafProduct(cpe='cpe123', package='kernel', module="module:8")]})
> collection.to_tuples("key1", ("cpe", "package", "module"))
> [("cpe123", "kernel", "module:8")]
"""
res = []
for item in self[key]:
items = []
for attribute in attributes:
items.append(getattr(item, attribute))
res.append(tuple(items))
return res

@overload
def get(self, key: str, default: None = None) -> Optional[list[CsafProduct]]:
...

@overload
def get(self, key: str, default: list[CsafProduct]) -> list[CsafProduct]:
...

def get(self, key, default=None):
"""Return the value for key if key is in the collection, else default."""
return self._cves.get(key, default)

def update(self, data: CsafCves) -> None:
"""Update data in collection - same as dict.update()."""
self._cves.update(data._cves) # pylint: disable=protected-access

def __getitem__(self, key: str) -> list[CsafProduct]:
return self._cves[key]

def __setitem__(self, key: str, val: list[CsafProduct]) -> None:
self._cves[key] = val

def __iter__(self) -> Iterator[list[CsafProduct]]:
return iter(self._cves.values())

def __next__(self) -> list[CsafProduct]:
return next(iter(self))

def __contains__(self, key: str) -> bool:
return key in self._cves

def __repr__(self) -> str:
return repr(self._cves)

def __len__(self) -> int:
return len(self._cves)


@dataclass
class CsafData:
"""CSAF Data class."""

files: CsafFiles = attr.Factory(CsafFiles)
cves: CsafCves = attr.Factory(CsafCves)

0 comments on commit 19167bb

Please sign in to comment.