Skip to content

Commit

Permalink
97: Add Bus Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
langei committed Nov 23, 2024
1 parent 0847a22 commit 59fec51
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 4 deletions.
242 changes: 242 additions & 0 deletions canlink/busmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Package canlink provides utilities to interact with a
// Controller Area Network (CAN Bus).
package canlink

import (
"context"
"errors"
"net"
"sync"
"time"

"go.einride.tech/can/pkg/socketcan"
"go.uber.org/zap"
)

// Default buffered channel length for the broadcast and
// incoming receive channels.
//
// In order to preserve the real time broadcast requirement,
// BusManager will drop messages for any Handler that is
// unable to keep up with the broadcast rate.
const _channelBufferLength = 1000

// BusManager is a centralized node responsible for orchestrating
// all interactions with a CAN bus.
//
// It acts as a message broker supporting the transmission
// of bus traffic to registered handlers and receiving incoming
// messages from these handlers to write to the bus.
//
// BusManager uses SocketCAN on the Linux platform. Note that
// it does not manage the lifetime of the network socket connection.
//
// Example:
//
// package main
//
// import (
// "context"
// "fmt"
// "time"
//
// "go.einride.tech/can/pkg/socketcan"
// "go.uber.org/zap"
// )
//
// func main () {
// ctx := context.Background()
//
// loggerConfig := zap.NewDevelopmentConfig()
// logger, err := loggerConfig.Build()
//
// // Create a network connection for vcan0
// conn, err := socketcan.DialContext(context.Background(), "can", "vcan0")
// if err != nil {
// return
// }
//
// manager := canlink.NewBusManager(logger, conn)
// handler = NewHandler(...)
//
// broadcast, transmit := manager.Register(handler)
//
// handler.Handle(...)
//
// manager.Start(ctx)
//
// ...
//
// manager.Stop()
// manager.Close()
type BusManager struct {
broadcastChan map[Handler]chan TimestampedFrame
incomingChan chan TimestampedFrame

receiver *socketcan.Receiver
transmitter *socketcan.Transmitter

l *zap.Logger
stop chan struct{}
isRunning bool
mu sync.Mutex
}

// NewBusManager returns a BusManager object.
//
// The network connection is injected into the BusManager
// and provides the interface for a single bus.
//
// See usage example.
func NewBusManager(l *zap.Logger, conn *net.Conn) *BusManager {
busManager := &BusManager{
l: l.Named("Bus Manager"),

broadcastChan: make(map[Handler]chan TimestampedFrame),
incomingChan: make(chan TimestampedFrame, _channelBufferLength),

receiver: socketcan.NewReceiver(*conn),
transmitter: socketcan.NewTransmitter(*conn),
}

return busManager
}

// Register a Handler with the BusManager.
//
// Register returns two channels for the specified Handler.
// The broadcast channel is a stream of traffic received from
// the bus. The incoming channel enables the Handler to write
// a frame to the bus.
//
// The channels operate on a TimestampedFrame object.
func (b *BusManager) Register(
handler Handler,
) (chan TimestampedFrame, chan TimestampedFrame) {
b.mu.Lock()
defer b.mu.Unlock()

b.l.Info("registering handler")

subscription := make(chan TimestampedFrame, _channelBufferLength)
b.broadcastChan[handler] = subscription

b.l.Info("registered handler")

return subscription, b.incomingChan
}

// Unregister a Handler from the BusManager.
//
// Deletes the broadcast channel that was previously
// provided from BusManager.
func (b *BusManager) Unregister(handler *Handler) {
b.mu.Lock()
defer b.mu.Unlock()

delete(b.broadcastChan, *handler)
}

// Start the traffic broadcast and incoming frame listener
// for each of the registered handlers.
//
// The broadcast stream will begin. If handlers cannot keep up
// with the broadcast, frames for that handler will be dropped.
//
// The handlers can now send CAN frames to the BusManager to be
// transmitted out onto the bus.
func (b *BusManager) Start(ctx context.Context) {
b.l.Info("start broadcast and incoming")

if b.isRunning {
return
}

b.stop = make(chan struct{})

go b.broadcast(ctx)
go b.processIncoming(ctx)

b.isRunning = true
}

// Stop the traffic broadcast and incoming frame listener.
//
// Preserves registered handlers and their assosciated channels.
func (b *BusManager) Stop() {
b.l.Info("stop broadcast and incoming")

if !b.isRunning {
return
}

close(b.stop)
b.isRunning = false
}

// Close cleans up the bus network connection.
func (b *BusManager) Close(conn net.Conn) error {
b.l.Info("closing socketcan receiver and transmitter")

if b.isRunning {
return errors.New("cannot close active bus manager")
}

b.receiver.Close()
b.transmitter.Close()

return nil
}

func (b *BusManager) broadcast(ctx context.Context) {
timeFrame := TimestampedFrame{}

for b.receiver.Receive() {
timeFrame.Frame = b.receiver.Frame()
timeFrame.Time = time.Now()

b.mu.Lock()

for handler, ch := range b.broadcastChan {
select {
case <-ctx.Done():
b.l.Info("context deadline exceeded")
return
case _, ok := <-ch:
if !ok {
b.l.Info("broadcast channel closed, exiting broadcast routine")
return
}
case <-b.stop:
b.l.Info("stop signal received")
return
case ch <- timeFrame:
b.l.Info("broadcasted can frame")
default:
b.l.Warn("dropping frames on handler", zap.String("handler", handler.Name()))
}
}

b.mu.Unlock()
}
}

func (b *BusManager) processIncoming(ctx context.Context) {
for {
select {
case <-ctx.Done():
b.l.Info("context deadline exceeded")
return
case <-b.stop:
b.l.Info("stop signal received")
return
case frame, ok := <-b.incomingChan:
if !ok {
b.l.Info("incoming channel closed, exiting process routine")
return
}

b.transmitter.TransmitFrame(ctx, frame.Frame)
}
}
}
6 changes: 6 additions & 0 deletions canlink/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package canlink

type Handler interface {
Name() string
Handle(chan TimestampedFrame, chan TimestampedFrame)
}
2 changes: 1 addition & 1 deletion canlink/timestampedframe.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"go.einride.tech/can"
)

// TimestampedFrame contains a single CAN frame along with the time it was received
// TimestampedFrame contains a single CAN frame along with the time it was received.
type TimestampedFrame struct {
Frame can.Frame
Time time.Time
Expand Down
Binary file added cmd/busmanager/busmanager
Binary file not shown.
87 changes: 87 additions & 0 deletions cmd/busmanager/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"context"
"fmt"
"time"

"go.einride.tech/can/pkg/socketcan"
"go.uber.org/zap"

"github.com/macformula/hil/canlink"
)

type Handler struct {
name string
}

func NewHandler() *Handler {
handler := &Handler{
name: "testHandler",
}

return handler
}

func (h *Handler) Handle(
broadcast chan canlink.TimestampedFrame,
transmit chan canlink.TimestampedFrame,
) {
go func() {
for {
select {
case frame := <-broadcast:
fmt.Println("RECEIVED: ", frame.Frame)
default:
}
}
}()

go func() {
var i byte

for {
time.Sleep(2 * time.Millisecond)

frame := canlink.TimestampedFrame{}
copy(frame.Frame.Data[:], []byte{i})
frame.Time = time.Now()

i = i + 1

transmit <- frame
}
}()
}

func (h *Handler) Name() string {
return "Handler"
}

func main() {
ctx := context.Background()

loggerConfig := zap.NewDevelopmentConfig()
logger, err := loggerConfig.Build()

conn, err := socketcan.DialContext(context.Background(), "can", "vcan0")
if err != nil {
logger.Error("failed to create socket can connection",
zap.String("can_interface", "vcan0"),
zap.Error(err),
)
return
}

manager := canlink.NewBusManager(logger, &conn)
handler := NewHandler()

broadcast, transmit := manager.Register(handler)

handler.Handle(broadcast, transmit)

manager.Start(ctx)

for {
}
}
15 changes: 12 additions & 3 deletions cmd/tracetest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"os/signal"
"time"

"github.com/pkg/errors"
"go.einride.tech/can/pkg/socketcan"
"go.uber.org/zap"

"github.com/macformula/hil/canlink"
"github.com/macformula/hil/macformula/cangen/vehcan"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -144,7 +144,12 @@ func waitForSigTerm(stop chan struct{}, logger *zap.Logger) {
}

func startSendMessageRoutine(
ctx context.Context, stop chan struct{}, msgPeriod time.Duration, cc *canlink.CanClient, l *zap.Logger) {
ctx context.Context,
stop chan struct{},
msgPeriod time.Duration,
cc *canlink.CanClient,
l *zap.Logger,
) {
packState := vehcan.NewPack_State()
packState.SetPopulated_Cells(_numCells)
packState.SetPack_Current(0)
Expand Down Expand Up @@ -179,7 +184,11 @@ func startSendMessageRoutine(
// +packCurrentDeviation on even i, -packCurrentDeviation on odd i
packCurrentDeviation := _packCurrentDeviation * float64(i%2+1) * (-1)
packCurrentIncr := float64(msgPeriod/time.Second) * _packCurrentIncrPerSec
packCurrent := clamp(packState.Pack_Current()+packCurrentIncr, _minPackCurrent, _maxPackCurrent)
packCurrent := clamp(
packState.Pack_Current()+packCurrentIncr,
_minPackCurrent,
_maxPackCurrent,
)
packCurrent += packCurrentDeviation
packState.SetPack_Current(packCurrent)
} else {
Expand Down

0 comments on commit 59fec51

Please sign in to comment.