Skip to content

Commit

Permalink
shuffle a bit. Add more config.
Browse files Browse the repository at this point in the history
  • Loading branch information
dudo committed Dec 19, 2023
1 parent 59ca8fc commit 2b6d57f
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 178 deletions.
4 changes: 4 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
PIXIE_URL="127.0.0.1:12345"
PIXIE_STREAM_SLEEP=10
PIXIE_ERROR_MAX=3
PXL_FILE_PATH="./config/config.pxl"
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ RUN groupadd -r nonroot && useradd --no-log-init -r -g nonroot nonroot

# Build the binary with full module support and without Cgo.
# Compile the binary statically including all dependencies.
RUN CGO_ENABLED=0 GOOS=linux go build -mod=readonly -a -installsuffix cgo -o /go/bin/main .
RUN CGO_ENABLED=0 GOOS=linux go build -mod=readonly -a -installsuffix cgo -o /go/bin/main ./cmd/observer

# Second stage: build the runtime container.
# Start from a scratch image, which is an empty container.
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
# Observer

Meant to be paired with Pixie's [standalone PEM](https://github.com/pixie-io/pixie/tree/main/src/experimental/standalone_pem).

## Usage

Install via [Helm](https://github.com/orbservability/helm-charts/tree/main/charts/observer) if you're using Kubernetes.

If you're loading this manually, add your PxL script at $PXL_FILE_PATH, and point to the PEM via $PIXIE_URL.

## Development

```sh
docker compose build
docker compose run --rm go mod tidy
Expand Down
76 changes: 76 additions & 0 deletions cmd/observer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"context"
"io"
"log"
"time"

"px.dev/pxapi"
"px.dev/pxapi/errdefs"

"orbservability/observer/pkg/config"
)

func main() {
ctx := context.Background()

cfg, err := config.NewConfig()
if err != nil {
log.Fatal("Error with Config:", err)
}

// Create a Pixie client with local standalonePEM listening address
client, err := pxapi.NewClient(
ctx,
pxapi.WithDirectAddr(cfg.PixieURL),
pxapi.WithDirectCredsInsecure(),
)
if err != nil {
log.Fatalf("Failed to create Pixie client: %v", err)
}

// Create a connection to the host.
hostID := "localhost"
vz, err := client.NewVizierClient(ctx, hostID)
if err != nil {
log.Fatalf("Failed to create Vizier client: %v", err)
}

// Create TableMuxer to accept results table.
tm := &tableMux{}

executionErrorCount := 0

for {
// Execute the PxL script and check for resultSet
resultSet, err := vz.ExecuteScript(ctx, cfg.PxL, tm)
if err != nil {
executionErrorCount += 1
if executionErrorCount > cfg.MaxErrorCount {
log.Fatalf("Failed to execute PxL script: %v", err)
} else {
time.Sleep(time.Second * time.Duration(cfg.PixieStreamSleep))
continue
}
}

for {
// Receive the PxL script results.
err := resultSet.Stream()
if err != nil {
if err == io.EOF || err.Error() == "stream has already been closed" {
// End of stream or stream closed, break to reopen stream
break
}
if errdefs.IsCompilationError(err) {
log.Fatalf("Compilation error: %v", err)
}

break
}
}
resultSet.Close()
time.Sleep(time.Second * time.Duration(cfg.PixieStreamSleep))
}
}
75 changes: 75 additions & 0 deletions cmd/observer/pxapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"

"px.dev/pxapi"
"px.dev/pxapi/errdefs"
"px.dev/pxapi/types"
)

// Satisfies the TableRecordHandler interface.
type tablePrinter struct {
headerValues []string // A slice of strings to hold column names
}

func (t *tablePrinter) HandleInit(ctx context.Context, metadata types.TableMetadata) error {
// Store column names in order
for _, col := range metadata.ColInfo {
t.headerValues = append(t.headerValues, col.Name)
}
return nil
}

func (t *tablePrinter) HandleRecord(ctx context.Context, r *types.Record) error {
if len(r.Data) != len(t.headerValues) {
return fmt.Errorf("%w: mismatch in header and data sizes", errdefs.ErrInvalidArgument)
}

recordMap := make(map[string]interface{})

for i, d := range r.Data {
var value interface{}

switch v := d.(type) {
case *types.BooleanValue:
value = v.Value()
case *types.Int64Value:
value = v.Value()
case types.Float64Value:
value = v.Value()
case *types.Time64NSValue:
value = v.Value()
case *types.UInt128Value:
value = v.Value()
default:
// Fallback to string representation if type is unknown
value = d.String()
}

recordMap[t.headerValues[i]] = value
}

jsonRecord, err := json.Marshal(recordMap)
if err != nil {
log.Printf("Error marshaling record to JSON: %s", err)
return err
}

fmt.Println(string(jsonRecord))
return nil
}

func (t *tablePrinter) HandleDone(ctx context.Context) error {
return nil
}

// Satisfies the TableMuxer interface.
type tableMux struct{}

func (s *tableMux) AcceptTable(ctx context.Context, metadata types.TableMetadata) (pxapi.TableRecordHandler, error) {
return &tablePrinter{}, nil
}
177 changes: 0 additions & 177 deletions main.go

This file was deleted.

Loading

0 comments on commit 2b6d57f

Please sign in to comment.