diff --git a/canlink/busmanager.go b/canlink/busmanager.go new file mode 100644 index 0000000..520723e --- /dev/null +++ b/canlink/busmanager.go @@ -0,0 +1,242 @@ +// Package canlink provides utilities to interact with a +// Controller Area Network (CAN Bus). +package canlink + +import ( + "context" + "errors" + "net" + "sync" + "time" + + "go.einride.tech/can/pkg/socketcan" + "go.uber.org/zap" +) + +// Default buffered channel length for the broadcast and +// incoming receive channels. +// +// In order to preserve the real time broadcast requirement, +// BusManager will drop messages for any Handler that is +// unable to keep up with the broadcast rate. +const _channelBufferLength = 1000 + +// BusManager is a centralized node responsible for orchestrating +// all interactions with a CAN bus. +// +// It acts as a message broker supporting the transmission +// of bus traffic to registered handlers and receiving incoming +// messages from these handlers to write to the bus. +// +// BusManager uses SocketCAN on the Linux platform. Note that +// it does not manage the lifetime of the network socket connection. +// +// Example: +// +// package main +// +// import ( +// "context" +// "fmt" +// "time" +// +// "go.einride.tech/can/pkg/socketcan" +// "go.uber.org/zap" +// ) +// +// func main () { +// ctx := context.Background() +// +// loggerConfig := zap.NewDevelopmentConfig() +// logger, err := loggerConfig.Build() +// +// // Create a network connection for vcan0 +// conn, err := socketcan.DialContext(context.Background(), "can", "vcan0") +// if err != nil { +// return +// } +// +// manager := canlink.NewBusManager(logger, conn) +// handler = NewHandler(...) +// +// broadcast, transmit := manager.Register(handler) +// +// handler.Handle(...) +// +// manager.Start(ctx) +// +// ... +// +// manager.Stop() +// manager.Close() +type BusManager struct { + broadcastChan map[Handler]chan TimestampedFrame + incomingChan chan TimestampedFrame + + receiver *socketcan.Receiver + transmitter *socketcan.Transmitter + + l *zap.Logger + stop chan struct{} + isRunning bool + mu sync.Mutex +} + +// NewBusManager returns a BusManager object. +// +// The network connection is injected into the BusManager +// and provides the interface for a single bus. +// +// See usage example. +func NewBusManager(l *zap.Logger, conn *net.Conn) *BusManager { + busManager := &BusManager{ + l: l.Named("Bus Manager"), + + broadcastChan: make(map[Handler]chan TimestampedFrame), + incomingChan: make(chan TimestampedFrame, _channelBufferLength), + + receiver: socketcan.NewReceiver(*conn), + transmitter: socketcan.NewTransmitter(*conn), + } + + return busManager +} + +// Register a Handler with the BusManager. +// +// Register returns two channels for the specified Handler. +// The broadcast channel is a stream of traffic received from +// the bus. The incoming channel enables the Handler to write +// a frame to the bus. +// +// The channels operate on a TimestampedFrame object. +func (b *BusManager) Register( + handler Handler, +) (chan TimestampedFrame, chan TimestampedFrame) { + b.mu.Lock() + defer b.mu.Unlock() + + b.l.Info("registering handler") + + subscription := make(chan TimestampedFrame, _channelBufferLength) + b.broadcastChan[handler] = subscription + + b.l.Info("registered handler") + + return subscription, b.incomingChan +} + +// Unregister a Handler from the BusManager. +// +// Deletes the broadcast channel that was previously +// provided from BusManager. +func (b *BusManager) Unregister(handler *Handler) { + b.mu.Lock() + defer b.mu.Unlock() + + delete(b.broadcastChan, *handler) +} + +// Start the traffic broadcast and incoming frame listener +// for each of the registered handlers. +// +// The broadcast stream will begin. If handlers cannot keep up +// with the broadcast, frames for that handler will be dropped. +// +// The handlers can now send CAN frames to the BusManager to be +// transmitted out onto the bus. +func (b *BusManager) Start(ctx context.Context) { + b.l.Info("start broadcast and incoming") + + if b.isRunning { + return + } + + b.stop = make(chan struct{}) + + go b.broadcast(ctx) + go b.processIncoming(ctx) + + b.isRunning = true +} + +// Stop the traffic broadcast and incoming frame listener. +// +// Preserves registered handlers and their assosciated channels. +func (b *BusManager) Stop() { + b.l.Info("stop broadcast and incoming") + + if !b.isRunning { + return + } + + close(b.stop) + b.isRunning = false +} + +// Close cleans up the bus network connection. +func (b *BusManager) Close(conn net.Conn) error { + b.l.Info("closing socketcan receiver and transmitter") + + if b.isRunning { + return errors.New("cannot close active bus manager") + } + + b.receiver.Close() + b.transmitter.Close() + + return nil +} + +func (b *BusManager) broadcast(ctx context.Context) { + timeFrame := TimestampedFrame{} + + for b.receiver.Receive() { + timeFrame.Frame = b.receiver.Frame() + timeFrame.Time = time.Now() + + b.mu.Lock() + + for handler, ch := range b.broadcastChan { + select { + case <-ctx.Done(): + b.l.Info("context deadline exceeded") + return + case _, ok := <-ch: + if !ok { + b.l.Info("broadcast channel closed, exiting broadcast routine") + return + } + case <-b.stop: + b.l.Info("stop signal received") + return + case ch <- timeFrame: + b.l.Info("broadcasted can frame") + default: + b.l.Warn("dropping frames on handler", zap.String("handler", handler.Name())) + } + } + + b.mu.Unlock() + } +} + +func (b *BusManager) processIncoming(ctx context.Context) { + for { + select { + case <-ctx.Done(): + b.l.Info("context deadline exceeded") + return + case <-b.stop: + b.l.Info("stop signal received") + return + case frame, ok := <-b.incomingChan: + if !ok { + b.l.Info("incoming channel closed, exiting process routine") + return + } + + b.transmitter.TransmitFrame(ctx, frame.Frame) + } + } +} diff --git a/canlink/handler.go b/canlink/handler.go new file mode 100644 index 0000000..37d9961 --- /dev/null +++ b/canlink/handler.go @@ -0,0 +1,6 @@ +package canlink + +type Handler interface { + Name() string + Handle(chan TimestampedFrame, chan TimestampedFrame) +} diff --git a/canlink/timestampedframe.go b/canlink/timestampedframe.go index cc5b2a8..ff5fff5 100644 --- a/canlink/timestampedframe.go +++ b/canlink/timestampedframe.go @@ -6,7 +6,7 @@ import ( "go.einride.tech/can" ) -// TimestampedFrame contains a single CAN frame along with the time it was received +// TimestampedFrame contains a single CAN frame along with the time it was received. type TimestampedFrame struct { Frame can.Frame Time time.Time diff --git a/cmd/busmanager/busmanager b/cmd/busmanager/busmanager new file mode 100755 index 0000000..ace4650 Binary files /dev/null and b/cmd/busmanager/busmanager differ diff --git a/cmd/busmanager/main.go b/cmd/busmanager/main.go new file mode 100644 index 0000000..7fe34c1 --- /dev/null +++ b/cmd/busmanager/main.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "fmt" + "time" + + "go.einride.tech/can/pkg/socketcan" + "go.uber.org/zap" + + "github.com/macformula/hil/canlink" +) + +type Handler struct { + name string +} + +func NewHandler() *Handler { + handler := &Handler{ + name: "testHandler", + } + + return handler +} + +func (h *Handler) Handle( + broadcast chan canlink.TimestampedFrame, + transmit chan canlink.TimestampedFrame, +) { + go func() { + for { + select { + case frame := <-broadcast: + fmt.Println("RECEIVED: ", frame.Frame) + default: + } + } + }() + + go func() { + var i byte + + for { + time.Sleep(2 * time.Millisecond) + + frame := canlink.TimestampedFrame{} + copy(frame.Frame.Data[:], []byte{i}) + frame.Time = time.Now() + + i = i + 1 + + transmit <- frame + } + }() +} + +func (h *Handler) Name() string { + return "Handler" +} + +func main() { + ctx := context.Background() + + loggerConfig := zap.NewDevelopmentConfig() + logger, err := loggerConfig.Build() + + conn, err := socketcan.DialContext(context.Background(), "can", "vcan0") + if err != nil { + logger.Error("failed to create socket can connection", + zap.String("can_interface", "vcan0"), + zap.Error(err), + ) + return + } + + manager := canlink.NewBusManager(logger, &conn) + handler := NewHandler() + + broadcast, transmit := manager.Register(handler) + + handler.Handle(broadcast, transmit) + + manager.Start(ctx) + + for { + } +} diff --git a/cmd/tracetest/main.go b/cmd/tracetest/main.go index 10d2b6e..09c872b 100644 --- a/cmd/tracetest/main.go +++ b/cmd/tracetest/main.go @@ -7,12 +7,12 @@ import ( "os/signal" "time" + "github.com/pkg/errors" "go.einride.tech/can/pkg/socketcan" "go.uber.org/zap" "github.com/macformula/hil/canlink" "github.com/macformula/hil/macformula/cangen/vehcan" - "github.com/pkg/errors" ) const ( @@ -144,7 +144,12 @@ func waitForSigTerm(stop chan struct{}, logger *zap.Logger) { } func startSendMessageRoutine( - ctx context.Context, stop chan struct{}, msgPeriod time.Duration, cc *canlink.CanClient, l *zap.Logger) { + ctx context.Context, + stop chan struct{}, + msgPeriod time.Duration, + cc *canlink.CanClient, + l *zap.Logger, +) { packState := vehcan.NewPack_State() packState.SetPopulated_Cells(_numCells) packState.SetPack_Current(0) @@ -179,7 +184,11 @@ func startSendMessageRoutine( // +packCurrentDeviation on even i, -packCurrentDeviation on odd i packCurrentDeviation := _packCurrentDeviation * float64(i%2+1) * (-1) packCurrentIncr := float64(msgPeriod/time.Second) * _packCurrentIncrPerSec - packCurrent := clamp(packState.Pack_Current()+packCurrentIncr, _minPackCurrent, _maxPackCurrent) + packCurrent := clamp( + packState.Pack_Current()+packCurrentIncr, + _minPackCurrent, + _maxPackCurrent, + ) packCurrent += packCurrentDeviation packState.SetPack_Current(packCurrent) } else {