Skip to content

Commit

Permalink
HTTPBenchmark: Tolerate probe failures when initially waiting for ser…
Browse files Browse the repository at this point in the history
…ver.

Prior to this change, the benchmark would fail if the HTTP backend server
failed to respond as soon as the Kubernetes service was available.

PiperOrigin-RevId: 706155955
  • Loading branch information
EtiennePerot authored and gvisor-bot committed Dec 14, 2024
1 parent 036c9d4 commit 7aa4e8d
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 31 deletions.
66 changes: 49 additions & 17 deletions test/kubernetes/benchmarks/httpbench/httpbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,18 @@ type HTTPBenchmark struct {
// Run runs the HTTP-based benchmark.
func (h *HTTPBenchmark) Run(ctx context.Context, t *testing.T) {
t.Helper()
if err := h.Cluster.WaitForServiceReady(ctx, h.Service); err != nil {
serverWaitCtx, serverWaitCancel := context.WithTimeout(ctx, 10*time.Minute)
if err := h.Cluster.WaitForServiceReady(serverWaitCtx, h.Service); err != nil {
t.Fatalf("Failed to wait for service: %v", err)
}
ip := testcluster.GetIPFromService(h.Service)
if ip == "" {
t.Fatalf("did not get valid ip: %s", ip)
}
if err := h.waitForServer(ctx, ip); err != nil {
if err := h.waitForServer(serverWaitCtx, ip); err != nil {
t.Fatalf("Failed to wait for server: %v", err)
}
serverWaitCancel()
for _, round := range h.Rounds {
qpsText := fmt.Sprintf("%d", round.TargetQPS)
if round.TargetQPS == InfiniteQPS {
Expand Down Expand Up @@ -146,7 +148,10 @@ func (h *HTTPBenchmark) runRound(ctx context.Context, t *testing.T, round Round,
}
defer h.Cluster.DeletePod(ctx, client)

if err := h.Cluster.WaitForPodCompleted(ctx, client); err != nil {
waitCtx, waitCancel := context.WithTimeout(ctx, round.Duration+2*time.Minute)
err = h.Cluster.WaitForPodCompleted(waitCtx, client)
waitCancel()
if err != nil {
t.Fatalf("failed to wait for wrk2 pod: %v", err)
}

Expand Down Expand Up @@ -243,21 +248,48 @@ func (h *HTTPBenchmark) getWgetPod(ip string) *v13.Pod {
// waitForServer waits for an HTTP server to start responding on the given
// IP and port.
func (h *HTTPBenchmark) waitForServer(ctx context.Context, ip string) error {
wget, err := h.Cluster.ConfigurePodForClientNodepool(ctx, h.getWgetPod(ip))
if err != nil {
return fmt.Errorf("failed to configure wget pod for client nodepool: %v", err)
}
wget, err = h.Cluster.CreatePod(ctx, wget)
if err != nil {
return fmt.Errorf("failed to create wget pod: %v", err)
}
defer h.Cluster.DeletePod(ctx, wget)
waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Minute)
defer waitCancel()
if err := h.Cluster.WaitForPodCompleted(waitCtx, wget); err != nil {
return fmt.Errorf("failed to wait for HTTP server %s:%d%s: %v", ip, h.Port, h.Path, err)
lastPhase := v13.PodUnknown
var lastLogs string
for ctx.Err() == nil {
wget, err := h.Cluster.ConfigurePodForClientNodepool(ctx, h.getWgetPod(ip))
if err != nil {
return fmt.Errorf("failed to configure wget pod for client nodepool: %w", err)
}
wget, err = h.Cluster.CreatePod(ctx, wget)
if err != nil {
return fmt.Errorf("failed to create wget pod: %w", err)
}
waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Minute)
phase, waitErr := h.Cluster.WaitForPodTerminated(waitCtx, wget)
waitCtxErr := waitCtx.Err()
waitCancel()
if waitErr == nil && phase != v13.PodSucceeded {
logs, err := h.Cluster.ReadPodLogs(ctx, wget)
if err != nil {
_ = h.Cluster.DeletePod(ctx, wget) // Best-effort delete.
return fmt.Errorf("failed to read wget pod logs: %w", err)
}
lastLogs = logs
}
deleteErr := h.Cluster.DeletePod(ctx, wget)
if ctx.Err() != nil {
break
}
if waitCtxErr != nil {
continue
}
if waitErr != nil {
return fmt.Errorf("failed to wait for wget pod: %w", waitErr)
}
if deleteErr != nil {
return fmt.Errorf("failed to delete wget pod: %w", deleteErr)
}
if phase == v13.PodSucceeded {
return nil
}
lastPhase = phase
}
return nil
return fmt.Errorf("wget pod still fails after context expiry (last phase: %v; last logs: %q)", lastPhase, lastLogs)
}

/*
Expand Down
37 changes: 23 additions & 14 deletions test/kubernetes/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,18 +451,27 @@ func (t *TestCluster) ReadPodLogs(ctx context.Context, pod *v13.Pod) (string, er

// WaitForPodRunning is a helper method to wait for a pod to be running.
func (t *TestCluster) WaitForPodRunning(ctx context.Context, pod *v13.Pod) error {
return t.doWaitForPod(ctx, pod, v13.PodRunning)
_, err := t.doWaitForPod(ctx, pod, func(p v13.PodPhase) bool { return p == v13.PodRunning })
return err
}

// WaitForPodCompleted is a helper method to wait for a pod to be completed.
func (t *TestCluster) WaitForPodCompleted(ctx context.Context, pod *v13.Pod) error {
return t.doWaitForPod(ctx, pod, v13.PodSucceeded)
_, err := t.doWaitForPod(ctx, pod, func(p v13.PodPhase) bool { return p == v13.PodSucceeded })
return err
}

// WaitForPodTerminated is a helper method to wait for a pod to exit,
// whether it succeeded or failed.
func (t *TestCluster) WaitForPodTerminated(ctx context.Context, pod *v13.Pod) (v13.PodPhase, error) {
return t.doWaitForPod(ctx, pod, func(p v13.PodPhase) bool { return p == v13.PodSucceeded || p == v13.PodFailed })
}

// doWaitForPod waits for a pod to complete based on a given v13.PodPhase.
func (t *TestCluster) doWaitForPod(ctx context.Context, pod *v13.Pod, phase v13.PodPhase) error {
func (t *TestCluster) doWaitForPod(ctx context.Context, pod *v13.Pod, phasePredicate func(v13.PodPhase) bool) (v13.PodPhase, error) {
podLogger := log.BasicRateLimitedLogger(5 * time.Minute)
startLogTime := time.Now().Add(3 * time.Minute)
startTime := time.Now()
startLogTime := startTime.Add(3 * time.Minute)

var p *v13.Pod
var err error
Expand All @@ -472,29 +481,29 @@ func (t *TestCluster) doWaitForPod(ctx context.Context, pod *v13.Pod, phase v13.
select {
case <-pollCh.C:
if p, err = t.GetPod(ctx, pod); err != nil {
return fmt.Errorf("failed to poll pod: %w", err)
return v13.PodUnknown, fmt.Errorf("failed to poll pod: %w", err)
}
case <-ctx.Done():
return fmt.Errorf("context expired waiting for pod %q: %w", pod.GetName(), ctx.Err())
return v13.PodUnknown, fmt.Errorf("context expired waiting for pod %q: %w", pod.GetName(), ctx.Err())
}
if p.Status.Reason == v13.PodReasonUnschedulable {
return fmt.Errorf("pod %q failed: reason: %q message: %q", pod.GetName(), p.Status.Reason, p.Status.Message)
return v13.PodPending, fmt.Errorf("pod %q cannot be scheduled: reason: %q message: %q", p.GetName(), p.Status.Reason, p.Status.Message)
}

for _, c := range p.Status.Conditions {
if strings.Contains(c.Reason, v13.PodReasonUnschedulable) {
return fmt.Errorf("pod %q failed: reason: %q message: %q", p.GetName(), c.Reason, c.Message)
return v13.PodPending, fmt.Errorf("pod %q cannot be scheduled: reason: %q message: %q", p.GetName(), c.Reason, c.Message)
}
}

switch p.Status.Phase {
case v13.PodFailed:
return fmt.Errorf("pod %q failed: %s", pod.GetName(), p.Status.Message)
case phase:
return nil
if phasePredicate(p.Status.Phase) {
return p.Status.Phase, nil
}
if p.Status.Phase == v13.PodFailed {
return v13.PodFailed, fmt.Errorf("pod %q failed: %s", p.GetName(), p.Status.Message)
}
if time.Now().After(startLogTime) {
podLogger.Infof("Still waiting for pod %q after %v; pod status: %v", pod.GetName(), time.Since(startLogTime), p.Status)
podLogger.Infof("Still waiting for pod %q after %v; pod status: %v", p.GetName(), time.Since(startTime), p.Status)
}
}
}
Expand Down

0 comments on commit 7aa4e8d

Please sign in to comment.