Skip to content

Commit

Permalink
feat: add component code segmentation
Browse files Browse the repository at this point in the history
Extracts source code segments for doc string, signature, python code
and yaml/jinja code.

Only handles errors that hinder segmentation and expects the python and
yaml parsers to deal with the rest.

The segmentation is based on the tokenize module that officially only
supports syntactically valid python code. The implemented solution
should be sufficiently clean and stable for the given use case anyways.
The python version in the coreos-installer container shouldn't change
too often and unit testing should catch any breaking changes in the
tokenize module.
  • Loading branch information
the-dipsy committed Feb 18, 2024
1 parent 4b5233a commit 93632ca
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pyromaniac/compiler/code/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from ..errors import CompilerError


class CodeError(CompilerError):
pass
11 changes: 11 additions & 0 deletions pyromaniac/compiler/code/segment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .errors import (
SegmentError,
UnexpectedTokenError,
InvalidSignatureError,
)
from .segment import segment

__all__ = [
segment,
SegmentError, UnexpectedTokenError, InvalidSignatureError,
]
47 changes: 47 additions & 0 deletions pyromaniac/compiler/code/segment/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import TYPE_CHECKING
from ..errors import CodeError

if TYPE_CHECKING:
from .token import Token


class SegmentError(CodeError):
"""Code segmenting error.
:param token: token at which error occured
"""

def __init__(self, token: 'Token'):
self.token = token

@property
def line(self) -> int:
"""Shortcut for token info start line."""
return self.token.info.start[0]


class InvalidSignatureError(SegmentError):
"""Invalid signature error.
:param token: token at wich error occured
"""

def __str__(self) -> str:
print(self.token.string)
return f"unmatched delimiter in signature in line {self.line}"


class UnexpectedTokenError(SegmentError):
"""Unexpected token error.
:param token: token at wich error occured
:param location: string describing where the error occured
"""

def __init__(self, token: 'Token', location: str):
super().__init__(token)
self.location = location

def __str__(self) -> str:
string = repr(self.token.string)
return f"unexpected {string} {self.location} in line {self.line}"
27 changes: 27 additions & 0 deletions pyromaniac/compiler/code/segment/segment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from .segmenter import Segmenter


def segment(
code: str
) -> tuple[str | None, str | None, str | None, str | None]:
"""Segment the source code into doc string, signature, python, and yaml.
Returns either the segment string or None for each possible segment. Makes
sure that the line numbers stay unaltered for python and yaml code to
enable accurate error messages during parsing.
:param code: source code to create segments from
:returns: string or None for each segment respectively
"""
doc, sig, python, yaml = Segmenter(code).segment()
return (
code[doc] if doc is not None else None,
code[sig] if sig is not None else None,
extract(code, python) if python is not None else None,
extract(code, yaml) if yaml is not None else None,
)


# extract code segment without altering line numbers
def extract(code: str, slc: slice):
return "\n" * code[:slc.start].count("\n") + code[slc]
116 changes: 116 additions & 0 deletions pyromaniac/compiler/code/segment/segmenter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import tokenize as t

from .errors import UnexpectedTokenError, InvalidSignatureError
from .token import Token
from .stream import Stream

# token types to ignore between meaningfull tokens
TYPES = [t.NL, t.NEWLINE, t.COMMENT]


class Segmenter:
"""Source code segmenter.
:param code: source code to segment"""

def __init__(self, code: str):
self.tokens = Stream(code)
self.length = len(code)

def segment(
self
) -> tuple[slice | None, slice | None, slice | None, slice | None]:
"""Segment source code into doc string, signature, python and yaml.
Returns either a slice or None for each possible segment. If a slice,
it can be used to index the source code to get the according segment.
Raises errors when doc string or signature are followed by unexpected
tokens or the signature isn't finished but leaves all other error
detection to the python and yaml parsers.
:returns: tuple of optional slices representing source code positions
"""
# initialize result slices
doc, sig, python, yaml = (None,) * 4

# consume encoding token
last = self.tokens.consume([t.ENCODING])
if last is None:
token = self.tokens.get(0)
raise UnexpectedTokenError(token, "at the beginning")

# get doc string if present
last = self.tokens.consume(TYPES) or last
if self.tokens.match(t.STRING):
doc, last = self.read_doc()
last = self.tokens.consume(TYPES)
if last is None:
token = self.tokens.get(0)
raise UnexpectedTokenError(token, "after the doc string")

# get signature if present
if self.tokens.match((t.OP, '(')):
sig, last = self.read_signature()
last = self.tokens.consume(TYPES)
if last is None:
token = self.tokens.get(0)
raise UnexpectedTokenError(token, "after the signature")

# get python code if present
if self.tokens.match(
(t.OP, '-'), (t.OP, '-'), (t.OP, '-'), t.NEWLINE,
):
last = self.tokens.consume(3)
python, last, end = self.read_python()
else:
end = False

# get yaml code if present
if not end:
yaml = slice(last.stop, self.length)

# return slices
return doc, sig, python, yaml

# read doc string and return the slice and the last consumed token
def read_doc(self) -> tuple[slice, Token]:
last = self.tokens.consume(1)
return last.slice, last

# read the signature and return the slice and the last consumed token
def read_signature(self) -> tuple[slice, Token]:
balance = 0

# consume opening paranthesis
last = self.tokens.consume(1)
start = last.start
balance += 1

# consume until matching closing paranthesis
while balance > 0:
if self.tokens.match((t.OP, '(')):
balance += 1
elif self.tokens.match((t.OP, ')')):
balance -= 1
elif self.tokens.match(t.ERRORTOKEN):
raise InvalidSignatureError(self.tokens.get(0))
last = self.tokens.consume(1)

return slice(start, last.stop), last

# read the python code and return slice, the last token, and whether at end
def read_python(self) -> tuple[slice, Token, bool]:
last = self.tokens.consume(1)
start = self.tokens.get(0).start

while True:
if self.tokens.match(t.ENDMARKER):
return slice(start, last.stop), last, True
elif self.tokens.match(t.ERRORTOKEN):
return slice(start, self.length), last, True
elif last.type in (t.NL, t.NEWLINE) and self.tokens.match(
(t.OP, '-'), (t.OP, '-'), (t.OP, '-'), t.NEWLINE,
):
return slice(start, last.stop), self.tokens.consume(4), False
else:
last = self.tokens.consume(1)
110 changes: 110 additions & 0 deletions pyromaniac/compiler/code/segment/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from collections.abc import Iterable
from io import BytesIO
import tokenize as t

from .token import Token


class Stream:
"""Token stream with look ahead and matching.
Produces an endless stream of ERRORTOKENs after the ENDMARKER or when an
unclosed pair (paranthesis, quotes, etc.) is encountered.
:param code: source code to parse
"""

def __init__(self, code: str):
self.buffer = []
self.stream = generate(code)

def match(self, *pattern: int | tuple[int, str]) -> bool:
"""Check if leading tokens have specified types (and strings).
Returns True iff the next tokens in the stream match the specified
types (and string contents) in the specified order.
:param pattern: list of types and optionally strings to match against
:returns: whether the leading tokens match the pattern
"""
for i, pat in enumerate(pattern):
if i > 0 and self.get(i - 1).type == t.ENDMARKER:
return False

# check type and string
match pat, self.get(i):
case int(type), tok if type != tok.type:
return False
case (pt, ps), tok if (pt, ps) != (tok.type, tok.string):
return False

# return True if no mismatch occured
return True

def consume(self, what: int | list[int]) -> Token | None:
"""Remove specified tokens from the start of the stream.
If *what* is an integer it is interpretet as the amount of tokens to
remove. If it is a list of integers it is interpreted as a list of
token types which are removed until a token with a different type is at
the next one in the stream.
:param what: count or token types to remove
:returns: last removed token if any
"""
last = None

if isinstance(what, int):
for i in range(what):
last = self.get(0, True)
else:
while self.get(0).type in what:
last = self.get(0, True)

return last

def get(self, i: int, pop: bool = False) -> Token:
"""Get the token at position *i* and remove it if requested.
Makes sure, at least *i + 1* tokens are buffered and returns the
requested token, removing it if requested.
:param i: index of token to return
:param pop: whether to remove the token
:returns: the requested token
"""
while len(self.buffer) <= i:
self.buffer.append(next(self.stream))
return self.buffer.pop(i) if pop else self.buffer[i]


# generate tokens with position in source code
def generate(code: str) -> Iterable[Token]:
line_start = 0
end = 0
try:
for info in t.tokenize(BytesIO(code.encode()).readline):
# get token start and end
start = line_start + info.start[1]
if info.start[0] == info.end[0]:
slc = slice(start, line_start + info.end[1])
else:
slc = slice(start, start + len(t.string))

# keep track of start of line
match info.type:
case t.NL | t.NEWLINE: line_start = slc.stop
case t.STRING: line_start = slc.stop - info.end[1]
end = slc.stop

# yield token and position
yield Token(info, slc)
except t.TokenError:
pass

# Keep yielding error token on further reading
message = 'invalid token'
info = t.TokenInfo(t.ERRORTOKEN, message, info.start, info.start, message)
token = Token(info, slice(end, end))
while True:
yield token
33 changes: 33 additions & 0 deletions pyromaniac/compiler/code/segment/token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from tokenize import TokenInfo


class Token:
"""Syntactic token with source code position.
:param info: TokenInfo object
:param slice: position of token in source code
"""

def __init__(self, info: TokenInfo, slice: slice):
self.info = info
self.slice = slice

@property
def type(self) -> int:
"""Shortcut for token info type."""
return self.info.type

@property
def string(self) -> str:
"""Shortcut for token info string."""
return self.info.string

@property
def start(self) -> int:
"""Shortcut for source code position start."""
return self.slice.start

@property
def stop(self) -> int:
"""Shortcut for source code position stop."""
return self.slice.stop

0 comments on commit 93632ca

Please sign in to comment.