Skip to content

Commit

Permalink
fixed timer ReadEClock and added pytask test for it
Browse files Browse the repository at this point in the history
  • Loading branch information
cnvogelg committed Dec 29, 2024
1 parent c9a34bd commit 3c7b74d
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 33 deletions.
36 changes: 24 additions & 12 deletions amitools/vamos/lib/TimerDevice.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
import time
from amitools.vamos.libcore import LibImpl
from amitools.vamos.machine.regs import REG_A0
from amitools.vamos.astructs import AccessStruct
from amitools.vamos.libstructs import DateStampStruct

from datetime import datetime
from amitools.vamos.libstructs import TimeValStruct


class TimerDevice(LibImpl):
def ReadEClock(self, ctx):
eclockval = ctx.cpu.r_reg(REG_A0)
# our simulated EClock freq: 10 MHz
# its around the real EClock (3 MHz)
ECLOCK_HZ = 10_000_000
# how to convert ns time stamp to eclock
ECLOCK_NS_FACTOR = 100

dt = datetime.now()
def _get_eclock_lo_hi(self):
# use the monotonic time here to have a suitable clock for benchmarks
ts_ns = time.monotonic_ns()
eclk = ts_ns // self.ECLOCK_NS_FACTOR
eclk_lo = eclk & 0xFFFFFFFF
eclk_hi = eclk >> 32
return eclk_lo, eclk_hi

def ReadEClock(self, ctx):
addr = ctx.cpu.r_reg(REG_A0)
# get val struct
tv = TimeValStruct(mem=ctx.mem, addr=addr)

# abuse DateStampStruct
tv = AccessStruct(ctx.mem, DateStampStruct, struct_addr=eclockval)
tv.ds_Days = dt.microsecond / 1000000
tv.ds_Minute = dt.microsecond % 1000000
lo, hi = self._get_eclock_lo_hi()
tv.secs.val = hi
tv.micro.val = lo

return 50
# always return eclock freq
return self.ECLOCK_HZ
71 changes: 53 additions & 18 deletions amitools/vamos/libcore/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,44 @@ def __init__(self, ctx, base_addr=None, run_sp=None):
self.run_sp = run_sp


class LibProxyRegs:
def __init__(self, ctx, args, arg_regs, kwargs):
assert len(args) == len(arg_regs)
self.ctx = ctx
self.args = args
self.arg_regs = arg_regs
# shall we return d1 as well?
self.ret_d1 = kwargs.pop("ret_d1", False)
self.kw_args = kwargs
# auto strings
self.auto_strings = []

def input_reg_map(self):
reg_map = {}
for reg, val in zip(self.arg_regs, self.args):
# auto convert strings
if type(val) is str:
str_mem = self.ctx.alloc.alloc_cstr(val)
val = str_mem.addr
self.auto_strings.append(str_mem)

reg_map[reg] = val
return reg_map

def output_reg_list(self):
regs = [REG_D0]
if self.ret_d1:
regs.append(REG_D1)
return regs

def return_d1(self):
return self.ret_d1

def cleanup(self):
for mem in self.auto_strings:
self.ctx.alloc.free_cstr(mem)


class LibProxyGen:
"""Generate a new type derived from LibProxy holding all functions"""

Expand All @@ -33,22 +71,22 @@ def _gen_arg_regs(self, func_def):
def _gen_stub_call(self, arg_regs, stub_method):
def stub_call(self, *args, **kwargs):
"""proxy function to call lib stub directly"""
# ensure that all positional args are given
assert len(args) == len(arg_regs)
regs = LibProxyRegs(self.ctx, args, arg_regs, kwargs)

# fill registers with arg values
for reg, val in zip(arg_regs, args):
reg_map = regs.input_reg_map()
for reg, val in reg_map.items():
self.ctx.cpu.w_reg(reg, val)

# shall we return d1 as well?
ret_d1 = kwargs.pop("ret_d1", False)

# perform call at stub
stub_method(**kwargs)

# clean up auto strings
regs.cleanup()

# prepare return value
d0 = self.ctx.cpu.r_reg(REG_D0)
if ret_d1:
if regs.return_d1():
d1 = self.ctx.cpu.r_reg(REG_D1)
return (d0, d1)
else:
Expand All @@ -59,25 +97,22 @@ def stub_call(self, *args, **kwargs):
def _gen_lib_call(self, arg_regs, bias, name=None):
def lib_call(self, *args, **kwargs):
# ensure that all positional args are given
assert len(args) == len(arg_regs)
regs = LibProxyRegs(self.ctx, args, arg_regs, kwargs)

reg_map = {}
for reg, val in zip(arg_regs, args):
reg_map[reg] = val

ret_regs = [REG_D0]
# shall we return d1 as well?
ret_d1 = kwargs.pop("ret_d1", False)
if ret_d1:
ret_regs.append(REG_D1)
# get input/output reg map/list
reg_map = regs.input_reg_map()
ret_regs = regs.output_reg_list()

jump_addr = self.base_addr - bias

# perform native run
code = Code(jump_addr, self.run_sp, reg_map, ret_regs)
rs = self.ctx.runner(code, name=name)

if ret_d1:
# cleanup regs
regs.cleanup()

if regs.return_d1():
return rs.regs[REG_D0], rs.regs[REG_D1]
else:
return rs.regs[REG_D0]
Expand Down
8 changes: 6 additions & 2 deletions amitools/vamos/libmgr/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def open_lib_proxy(self, full_name, version=0, run_sp=None):
if base_addr == 0:
log_libmgr.warning("proxy: open_lib '%s' failed!", full_name)
return None
return self.open_lib_proxy_addr(base_addr, run_sp=run_sp)

def open_lib_proxy_addr(self, base_addr, run_sp=None):
# is a vlib?
vlib = self.lib_mgr.get_vlib_by_addr(base_addr)
if vlib:
Expand All @@ -59,10 +62,11 @@ def open_lib_proxy(self, full_name, version=0, run_sp=None):
if fd:
return self._setup_libcall_proxy(name, fd, base_addr, run_sp)
else:
log_libmgr.warning("proxy: no FD for '%s'", full_name)
log_libmgr.warning("proxy: no FD for '%s'", name)
return None
else:
raise VamosInternalError("Neither vlib nor alib?!")
log_libmgr.error("proxy: no lib at %08x", base_addr)
return None

def close_lib_proxy(self, proxy, run_sp=None):
"""Close the library assoicated with proxy and invalidate it."""
Expand Down
1 change: 1 addition & 0 deletions amitools/vamos/libstructs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .exec_ import *
from .dos import *
from .util import *
from .timer import *
21 changes: 21 additions & 0 deletions amitools/vamos/libstructs/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from amitools.vamos.astructs import (
AmigaStructDef,
AmigaStruct,
ULONG,
)
from .exec_ import IORequestStruct


# TimeVal
@AmigaStructDef
class TimeValStruct(AmigaStruct):
_format = [(ULONG, "tv_secs"), (ULONG, "tv_micro")]


# TimeRequest
@AmigaStructDef
class TimeRequestStruct(AmigaStruct):
_format = [
(IORequestStruct, "tr_node"),
(TimeValStruct, "tr_time"),
]
4 changes: 3 additions & 1 deletion test/helper/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def task_gen(mode_ctx):
return result

mode = BaseMode("task", task_gen)
exit_codes = main(args=[], mode=mode, single_return_code=False)
exit_codes = main(
args=["-l", "libmgr:info,exec:info"], mode=mode, single_return_code=False
)
assert exit_codes is not None
return exit_codes
12 changes: 12 additions & 0 deletions test/pytask/pytask_self.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import pytest


def pytask_self_simple_task_test(vamos_task):
"""check if pytask feature works"""

Expand Down Expand Up @@ -64,3 +67,12 @@ def task(ctx, task):

exit_codes = vamos_task.run([task])
assert exit_codes[0] == 42


def pytask_self_assert_test(vamos_task):
def task(ctx, task):
# fail test
assert 0 == 1

with pytest.raises(AssertionError):
vamos_task.run([task])
64 changes: 64 additions & 0 deletions test/pytask/pytask_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import time
from amitools.vamos.libstructs import TimeValStruct, TimeRequestStruct
from amitools.vamos.lib.TimerDevice import TimerDevice


def pytask_timer_eclock_test(vamos_task):
"""check GetEClock of Python"""

def task(ctx, task):
# get exec library
exec_proxy = ctx.proxies.get_exec_lib_proxy()

# alloc time struct
tv = TimeValStruct.alloc(ctx.alloc)
assert tv is not None

# alloc timer io req
req = TimeRequestStruct.alloc(ctx.alloc)
assert req is not None

prox = ctx.proxies.open_lib_proxy("timer.device")
print(f"prox: addr={prox.base_addr:08x}")

# open timer device
res = exec_proxy.OpenDevice("timer.device", 0, req.addr, 0)
assert res == 0

# get proxy for device
base_addr = req.node.device.aptr
tdev = ctx.proxies.open_lib_proxy_addr(base_addr)
assert tdev is not None

# call lib func
res = tdev.ReadEClock(tv.addr)
assert res == TimerDevice.ECLOCK_HZ
ts_last = tv.secs.val << 32 | tv.micro.val

for i in range(100):
# call lib func
res = tdev.ReadEClock(tv.addr)
assert res == TimerDevice.ECLOCK_HZ
ts = tv.secs.val << 32 | tv.micro.val

# assert monotonic time stamp
assert ts > ts_last

ts_last = ts

# free proxy
ctx.proxies.close_lib_proxy(tdev)

# close device
exec_proxy.CloseDevice(req.addr)

# free req
req.free()

# free timeval
tv.free()

return 0

exit_codes = vamos_task.run([task])
assert exit_codes == [0]
5 changes: 5 additions & 0 deletions test/unit/libcore_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def PrintString(self, **kwargs):
self.string_count += 1
self.string_kwargs = kwargs
self.string_reg_a0 = self.ctx.cpu.r_reg(REG_A0)
self.string_txt = self.ctx.mem.r_cstr(self.string_reg_a0)
self.ctx.cpu.w_reg(REG_D0, self.string_count)
self.ctx.cpu.w_reg(REG_D1, 2 * self.string_count)

Expand Down Expand Up @@ -86,19 +87,23 @@ def libcore_proxy_gen_stub_test():
assert stub.hello_kwargs == {"what": "why"}
assert ctx.cpu.r_reg(REG_D0) == stub.hello_count
# call string
ctx.mem.w_cstr(0x10, "hello, world!")
assert stub.string_count == 0
ret = proxy.PrintString(0x10, ret_d1=True)
assert ret == (1, 2)
assert stub.string_count == 1
assert stub.string_reg_a0 == 0x10
assert stub.string_txt == "hello, world!"
assert ctx.cpu.r_reg(REG_D0) == stub.string_count
assert ctx.cpu.r_reg(REG_D1) == stub.string_count * 2
# call string with kwargs
ctx.mem.w_cstr(0x20, "hi!")
assert stub.string_count == 1
ret = proxy.PrintString(0x20, ret_d1=True, foo="bar")
assert ret == (2, 4)
assert stub.string_count == 2
assert stub.string_reg_a0 == 0x20
assert stub.string_txt == "hi!"
assert stub.string_kwargs == {"foo": "bar"}
assert ctx.cpu.r_reg(REG_D0) == stub.string_count
assert ctx.cpu.r_reg(REG_D1) == stub.string_count * 2
Expand Down

0 comments on commit 3c7b74d

Please sign in to comment.