Skip to content

Commit

Permalink
Switch to streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
dudo committed Jan 10, 2024
1 parent 0a949f4 commit 13e0b56
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 202 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ RUN groupadd -r nonroot && useradd --no-log-init -r -g nonroot nonroot

# Build the binary with full module support and without Cgo.
# Compile the binary statically including all dependencies.
RUN CGO_ENABLED=0 GOOS=linux go build -mod=readonly -a -installsuffix cgo -o /go/bin/main ./cmd/observer
# RUN CGO_ENABLED=0 GOOS=linux go build -mod=readonly -a -installsuffix cgo -o /go/bin/main ./cmd/observer

# Second stage: build the runtime container.
# Start from a scratch image, which is an empty container.
Expand Down
66 changes: 14 additions & 52 deletions cmd/observer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,37 @@ package main

import (
"context"
"io"
"log"
"time"

"px.dev/pxapi"
"px.dev/pxapi/errdefs"

"orbservability/observer/pkg/config"
)

func main() {
ctx := context.Background()

// Load Config
cfg, err := config.NewConfig()
if err != nil {
log.Fatal("Error with Config:", err)
log.Fatal("Error loading config: ", err)
}

// Create a Pixie client with local standalonePEM listening address
client, err := pxapi.NewClient(
ctx,
pxapi.WithDirectAddr(cfg.PixieURL),
pxapi.WithDirectCredsInsecure(),
)
// Create a Pixie client
pixieClient, err := createPixieClient(ctx, cfg)
if err != nil {
log.Fatalf("Failed to create Pixie client: %v", err)
log.Fatal("Error creating Pixie client: ", err)
}

// Create a connection to the host.
hostID := "localhost"
vz, err := client.NewVizierClient(ctx, hostID)
// Establish a connection to the gRPC server
grpcConn, grpcStream, err := createGrpcStream(ctx, cfg)
if err != nil {
log.Fatalf("Failed to create Vizier client: %v", err)
log.Fatal("Error creating gRPC stream: ", err)
}
defer grpcConn.Close()
defer grpcStream.CloseAndRecv()

// Create TableMuxer to accept results table.
tm := &tableMux{}

executionErrorCount := 0

for {
// Execute the PxL script and check for resultSet
resultSet, err := vz.ExecuteScript(ctx, cfg.PxL, tm)
if err != nil {
executionErrorCount += 1
if executionErrorCount > cfg.MaxErrorCount {
log.Fatalf("Failed to execute PxL script: %v", err)
} else {
time.Sleep(time.Second * time.Duration(cfg.PixieStreamSleep))
continue
}
}

for {
// Receive the PxL script results.
err := resultSet.Stream()
if err != nil {
if err == io.EOF || err.Error() == "stream has already been closed" {
// End of stream or stream closed, break to reopen stream
break
}
if errdefs.IsCompilationError(err) {
log.Fatalf("Compilation error: %v", err)
}

break
}
}
resultSet.Close()
time.Sleep(time.Second * time.Duration(cfg.PixieStreamSleep))
// Execute PxL scripts and handle records
tm := &tableMux{grpcStream: grpcStream}
if err := executeAndStream(ctx, pixieClient, cfg, tm); err != nil {
log.Fatal("Error handling records: ", err)
}
}
77 changes: 77 additions & 0 deletions cmd/observer/orbservability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package main

import (
"context"
"fmt"
"orbservability/observer/pkg/config"
"reflect"

pb "github.com/orbservability/schemas/v1"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func createGrpcStream(ctx context.Context, cfg *config.Config) (*grpc.ClientConn, pb.EventGatewayService_StreamEventsClient, error) {
conn, err := grpc.Dial(cfg.OrbservabilityURL, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, err
}

client := pb.NewEventGatewayServiceClient(conn)
stream, err := client.StreamEvents(ctx)
if err != nil {
return nil, nil, err
}

return conn, stream, nil
}

func streamEvent(stream pb.EventGatewayService_StreamEventsClient, recordMap map[string]interface{}) error {
msg := &pb.PixieEvent{}
err := mapToProto(recordMap, msg)
if err != nil {
return err
}

if err := stream.Send(msg); err != nil {
return err
}

return nil
}

func mapToProto(recordMap map[string]interface{}, msg *pb.PixieEvent) error {
msgVal := reflect.ValueOf(msg).Elem()

for key, value := range recordMap {
field := msgVal.FieldByName(key)
if !field.IsValid() || !field.CanSet() {
continue // Skip invalid or unsettable fields
}

fieldValue := reflect.ValueOf(value)

// Check if the field is a nested message
if field.Kind() == reflect.Struct && fieldValue.Kind() == reflect.Map {
nestedMap, ok := value.(map[string]interface{})
if !ok {
return fmt.Errorf("expected map for nested field %s", key)
}

// Recursively call mapToProto for the nested object
err := mapToProto(nestedMap, field.Addr().Interface().(*pb.PixieEvent))
if err != nil {
return fmt.Errorf("error setting nested field %s: %v", key, err)
}

} else if field.Type() == fieldValue.Type() {
field.Set(fieldValue)
} else {
// Handle type conversion or return an error
return fmt.Errorf("type mismatch for field %s", key)
}
}

return nil
}
62 changes: 62 additions & 0 deletions cmd/observer/pixie.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"context"
"io"
"orbservability/observer/pkg/config"
"time"

"px.dev/pxapi"
"px.dev/pxapi/errdefs"
)

func createPixieClient(ctx context.Context, cfg *config.Config) (*pxapi.Client, error) {
return pxapi.NewClient(
ctx,
pxapi.WithDirectAddr(cfg.PixieURL),
pxapi.WithDirectCredsInsecure(),
)
}

func executeAndStream(ctx context.Context, client *pxapi.Client, cfg *config.Config, tm *tableMux) error {
vz, err := client.NewVizierClient(ctx, cfg.VizierHost)
if err != nil {
return err
}

executionErrorCount := 0
for {
resultSet, err := vz.ExecuteScript(ctx, cfg.PxL, tm)
if err != nil {
executionErrorCount++
if executionErrorCount > cfg.MaxErrorCount {
return err
}
time.Sleep(time.Second * time.Duration(cfg.PixieStreamSleep))
continue
}
defer resultSet.Close()

if err := streamResults(resultSet); err != nil {
return err
}

time.Sleep(time.Second * time.Duration(cfg.PixieStreamSleep))
}
}

func streamResults(resultSet *pxapi.ScriptResults) error {
for {
err := resultSet.Stream()
if err != nil {
if err == io.EOF || err.Error() == "stream has already been closed" {
return nil // End of stream or stream closed, return successfully
}
if errdefs.IsCompilationError(err) {
return err // Unrecoverable error
}

return nil // Unknown error, return successfully for retries
}
}
}
21 changes: 9 additions & 12 deletions cmd/observer/pxapi.go → cmd/observer/table_printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package main

import (
"context"
"encoding/json"
"fmt"
"log"

pb "github.com/orbservability/schemas/v1"
"px.dev/pxapi"
"px.dev/pxapi/errdefs"
"px.dev/pxapi/types"
Expand All @@ -14,6 +13,7 @@ import (
// Satisfies the TableRecordHandler interface.
type tablePrinter struct {
headerValues []string // A slice of strings to hold column names
grpcStream pb.EventGatewayService_StreamEventsClient
}

func (t *tablePrinter) HandleInit(ctx context.Context, metadata types.TableMetadata) error {
Expand Down Expand Up @@ -53,23 +53,20 @@ func (t *tablePrinter) HandleRecord(ctx context.Context, r *types.Record) error
recordMap[t.headerValues[i]] = value
}

jsonRecord, err := json.Marshal(recordMap)
if err != nil {
log.Printf("Error marshaling record to JSON: %s", err)
return err
}

fmt.Println(string(jsonRecord))
return nil
return streamEvent(t.grpcStream, recordMap)
}

func (t *tablePrinter) HandleDone(ctx context.Context) error {
return nil
}

// Satisfies the TableMuxer interface.
type tableMux struct{}
type tableMux struct {
grpcStream pb.EventGatewayService_StreamEventsClient
}

func (s *tableMux) AcceptTable(ctx context.Context, metadata types.TableMetadata) (pxapi.TableRecordHandler, error) {
return &tablePrinter{}, nil
return &tablePrinter{
grpcStream: s.grpcStream,
}, nil
}
23 changes: 13 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
module orbservability/observer

go 1.21
go 1.21.6

require px.dev/pxapi v0.5.0
require (
github.com/orbservability/schemas v0.3.2
google.golang.org/grpc v1.60.1
px.dev/pxapi v0.5.0
)

require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/gofrs/uuid v4.0.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
github.com/lestrrat-go/blackmagic v1.0.0 // indirect
github.com/lestrrat-go/httpcc v1.0.0 // indirect
github.com/lestrrat-go/iter v1.0.1 // indirect
github.com/lestrrat-go/jwx v1.2.17 // indirect
github.com/lestrrat-go/option v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.43.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.16.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/protobuf v1.32.0 // indirect
)
Loading

0 comments on commit 13e0b56

Please sign in to comment.