Skip to content

Commit

Permalink
feat(csaf): parse CSAF JSONs into CsafCves
Browse files Browse the repository at this point in the history
RHINENG-7697
  • Loading branch information
psegedy committed Jan 19, 2024
1 parent 19167bb commit af1fb70
Showing 1 changed file with 70 additions and 5 deletions.
75 changes: 70 additions & 5 deletions vmaas/reposcan/redhatcsaf/csaf_controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Module containing class for syncing set of CSAF files into the DB.
"""
import json
import os
import shutil
import tempfile
Expand All @@ -14,7 +15,11 @@
from vmaas.reposcan.download.downloader import FileDownloader
from vmaas.reposcan.download.downloader import VALID_HTTP_CODES
from vmaas.reposcan.mnm import CSAF_FAILED_DOWNLOAD
from vmaas.reposcan.redhatcsaf.modeling import CsafFileCollection
from vmaas.reposcan.redhatcsaf.modeling import CsafCves
from vmaas.reposcan.redhatcsaf.modeling import CsafData
from vmaas.reposcan.redhatcsaf.modeling import CsafFile
from vmaas.reposcan.redhatcsaf.modeling import CsafFiles
from vmaas.reposcan.redhatcsaf.modeling import CsafProduct

CSAF_VEX_BASE_URL = os.getenv("CSAF_VEX_BASE_URL", "https://access.redhat.com/security/data/csaf/beta/vex/")
CSAF_VEX_INDEX_CSV = os.getenv("CSAF_VEX_INDEX_CSV", "changes.csv")
Expand Down Expand Up @@ -77,7 +82,7 @@ def store(self):

db_csaf_files = self.csaf_store.csaf_file_map.copy()
batches = BatchList()
csaf_files = CsafFileCollection.from_table_map_and_csv(db_csaf_files, self.index_path)
csaf_files = CsafFiles.from_table_map_and_csv(db_csaf_files, self.index_path)
files_to_sync = csaf_files
if not CSAF_SYNC_ALL_FILES:
files_to_sync = csaf_files.out_of_date
Expand All @@ -97,11 +102,71 @@ def store(self):
self.logger.warning("%d CSAF files failed to download.", len(failed))
batch = [f for f in batch if (self.tmp_directory / f.name) not in failed]

to_store = CsafFileCollection()
to_store = CsafData()
for csaf_file in batch:
to_store[csaf_file.name] = csaf_file
# TODO: processing of files
to_store.files[csaf_file.name] = csaf_file
to_store.cves.update(self.parse_csaf_file(csaf_file))

self.csaf_store.store(to_store)
finally:
self.clean()

def parse_csaf_file(self, csaf_file: CsafFile) -> CsafCves:
"""Parse CSAF file to CsafCves."""
product_cpe = {}
cves = CsafCves()

with open(self.tmp_directory / csaf_file.name, "r", encoding="utf-8") as csaf_json:
csaf = json.load(csaf_json)
product_cpe = self._parse_product_tree(csaf)
unfixed_cves = self._parse_vulnerabilities(csaf, product_cpe)
cves.update(unfixed_cves)
return cves

def _parse_vulnerabilities(self, csaf: dict, product_cpe: dict) -> CsafCves:
unfixed_cves = CsafCves()
for vulnerability in csaf["vulnerabilities"]:
if "cve" not in vulnerability:
# `vulnerability` can be identified by `cve` or `ids`, we are interested only in those with `cve`
continue

# parse only CVEs with `known_affected` products aka `unfixed` CVEs
# TODO: do we need to parse also `under_investigation` products, does that mean the CVE is unfixed too?
unfixed_cves[vulnerability["cve"]] = []
for unfixed in vulnerability["product_status"].get("known_affected", []):
branch_product, rest = unfixed.split(":", 1)
pkg_name = rest
module = None
if "/" in rest:
# it's a package with a module
module, pkg_name = rest.split("/", 1)

csaf_product = CsafProduct(product_cpe[branch_product], pkg_name, module)
unfixed_cves[vulnerability["cve"]].append(csaf_product)

return unfixed_cves

def _parse_product_tree(self, csaf: dict) -> dict[str, str]:
product_cpe: dict[str, str] = {}
for branches in csaf.get("product_tree", {}).get("branches", []):
self._parse_branches(branches, product_cpe)

return product_cpe

def _parse_branches(self, branches: dict, product_cpe: dict[str, str]):
if branches.get("category") not in ("vendor", "product_family"):
return
sub_branches = branches.get("branches", [])
for sub_branch in sub_branches:
if "branches" in sub_branch:
self._parse_branches(sub_branch, product_cpe)

if sub_branch.get("category") != "product_name":
continue

product = sub_branch.get("product", {})
product_id = product.get("product_id")
cpe = product.get("product_identification_helper", {}).get("cpe")
if product_id is None or cpe is None:
continue
product_cpe[product_id] = cpe

0 comments on commit af1fb70

Please sign in to comment.