Skip to content

Commit

Permalink
[DP-1774] - topicctl get partitions to display under replicated and o…
Browse files Browse the repository at this point in the history
…ffline (#155)

* [DP-1774] - topicctl get partitions to display under replicated and offline

* [DP-1774] - topicctl get partitions to display under replicated and offline

* [DP-1774] - topicctl get partitions to display under replicated and offline

* [DP-1774] - topicctl get partitions to display under replicated and offline

* MInor code fixtures

* Minor fixtures. Modifying get metadata from all topics to nil

* Minor fixtures. Modifying get metadata from all topics to nil
  • Loading branch information
ssingudasu authored Oct 9, 2023
1 parent 0075b97 commit 01c619c
Show file tree
Hide file tree
Showing 9 changed files with 634 additions and 15 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ topicctl repl --cluster-config=examples/local-cluster/cluster.yaml
```
get brokers
get topics
get partitions
get partitions topic-default
get offsets topic-default
tail topic-default
Expand Down Expand Up @@ -171,7 +172,7 @@ resource type in the cluster. Currently, the following operations are supported:
| `get groups` | All consumer groups in the cluster |
| `get lags [topic] [group]` | Lag for each topic partition for a consumer group |
| `get members [group]` | Details of each member in a consumer group |
| `get partitions [topic]` | All partitions in a topic |
| `get partitions [optional: topics]` | Get all partitions for topics |
| `get offsets [topic]` | Number of messages per partition along with start and end times |
| `get topics` | All topics in the cluster |

Expand Down
46 changes: 41 additions & 5 deletions cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package subcmd

import (
"context"
"fmt"
"strings"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/cli"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -31,6 +33,15 @@ type getCmdConfig struct {

var getConfig getCmdConfig

type partitionsCmdConfig struct {
status admin.PartitionStatus
summary bool
}

var partitionsConfig partitionsCmdConfig

var partitionsStatusHelpText = "Allowed values: ok, offline, under-replicated"

func init() {
getCmd.PersistentFlags().BoolVar(
&getConfig.full,
Expand Down Expand Up @@ -211,10 +222,10 @@ func membersCmd() *cobra.Command {
}

func partitionsCmd() *cobra.Command {
return &cobra.Command{
Use: "partitions [topic]",
Short: "Displays partition information for the specified topic.",
Args: cobra.ExactArgs(1),
partitionsCommand := &cobra.Command{
Use: "partitions [optional: topics]",
Short: "Get all partitions information for topics",
Args: cobra.MinimumNArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())
Expand All @@ -225,10 +236,35 @@ func partitionsCmd() *cobra.Command {
}
defer adminClient.Close()

topics := []string{}
for _, arg := range args {
topics = append(topics, arg)
}

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.GetPartitions(ctx, args[0])
return cliRunner.GetPartitions(
ctx,
topics,
partitionsConfig.status,
partitionsConfig.summary,
)
},
}

partitionsCommand.Flags().Var(
&partitionsConfig.status,
"status",
fmt.Sprintf("partition status\n%s", partitionsStatusHelpText),
)

partitionsCommand.Flags().BoolVar(
&partitionsConfig.summary,
"summary",
false,
fmt.Sprintf("Display summary of partitions"),
)

return partitionsCommand
}

func offsetsCmd() *cobra.Command {
Expand Down
17 changes: 17 additions & 0 deletions pkg/admin/brokerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,3 +690,20 @@ func configEntriesToAPIConfigs(

return apiConfigs
}

func (c *BrokerAdminClient) GetAllTopicsMetadata(
ctx context.Context,
) (*kafka.MetadataResponse, error) {
client := c.GetConnector().KafkaClient
req := kafka.MetadataRequest{
Topics: nil,
}

log.Debugf("Metadata request: %+v", req)
metadata, err := client.Metadata(ctx, &req)
if err != nil {
return nil, fmt.Errorf("Error fetching all topics metadata: %+v", err)
}

return metadata, nil
}
3 changes: 3 additions & 0 deletions pkg/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Client interface {
detailed bool,
) (TopicInfo, error)

// GetAllTopicsMetadata performs kafka-go metadata call to get topic information
GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error)

// UpdateTopicConfig updates the configuration for the argument topic. It returns the config
// keys that were updated.
UpdateTopicConfig(
Expand Down
199 changes: 199 additions & 0 deletions pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,205 @@ func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) str
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatTopicsPartitionsSummary creates a pretty table with summary of the
// partitions for topics.
func FormatTopicsPartitionsSummary(
topicsPartitionsStatusSummary map[string]map[PartitionStatus][]int,
) string {
buf := &bytes.Buffer{}

headers := []string{
"Topic",
"Status",
"Count",
"IDs",
}
columnAligment := []int{
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
}

table := tablewriter.NewWriter(buf)
table.SetHeader(headers)
table.SetAutoWrapText(true)
table.SetColumnAlignment(columnAligment)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

topicNames := []string{}
tableData := make(map[string][][]string)
for topicName, partitionsStatusSummary := range topicsPartitionsStatusSummary {
topicTableRows := [][]string{}

for partitionStatus, partitionStatusIDs := range partitionsStatusSummary {
topicTableRows = append(topicTableRows, []string{
fmt.Sprintf("%s", topicName),
fmt.Sprintf("%s", partitionStatus),
fmt.Sprintf("%d", len(partitionStatusIDs)),
fmt.Sprintf("%+v", partitionStatusIDs),
})
}

// sort the topicTableRows by partitionStatus
statusSort := func(i, j int) bool {
// second element in the row is of type PartitionStatus
return string(topicTableRows[i][1]) < string(topicTableRows[j][1])
}

sort.Slice(topicTableRows, statusSort)

tableData[topicName] = topicTableRows
topicNames = append(topicNames, topicName)
}

sort.Strings(topicNames)
for _, topicName := range topicNames {
_, exists := tableData[topicName]
if exists {
for _, topicTableRow := range tableData[topicName] {
table.Append(topicTableRow)
}
}
}

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatTopicsPartitions creates a pretty table with information on all of the
// partitions for topics.
func FormatTopicsPartitions(
topicsPartitionsStatusInfo map[string][]PartitionStatusInfo,
brokers []BrokerInfo,
) string {
buf := &bytes.Buffer{}

headers := []string{
"Topic",
"ID",
"Leader",
"ISR",
"Replicas",
"Distinct\nRacks",
"Racks",
"Status",
}
columnAligment := []int{
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
}

table := tablewriter.NewWriter(buf)
table.SetHeader(headers)
table.SetAutoWrapText(false)
table.SetColumnAlignment(columnAligment)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

topicNames := []string{}
brokerRacks := BrokerRacks(brokers)
tableData := make(map[string][][]string)
for topicName, partitionsStatusInfo := range topicsPartitionsStatusInfo {
topicTableRows := [][]string{}
for _, partitionStatusInfo := range partitionsStatusInfo {
racks := partitionStatusInfo.Racks(brokerRacks)

distinctRacks := make(map[string]int)
for _, rack := range racks {
distinctRacks[rack] += 1
}

partitionIsrs := []int{}
for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr {
partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID)
}

partitionReplicas := []int{}
for _, partitionReplica := range partitionStatusInfo.Partition.Replicas {
partitionReplicas = append(partitionReplicas, partitionReplica.ID)
}

inSync := true
if partitionStatusInfo.Status != Ok {
inSync = false
}

correctLeader := true
if partitionStatusInfo.LeaderState != CorrectLeader {
correctLeader = false
}

var statusPrinter func(f string, a ...interface{}) string
if !util.InTerminal() || inSync {
statusPrinter = fmt.Sprintf
} else if !inSync {
statusPrinter = color.New(color.FgRed).SprintfFunc()
}

var statePrinter func(f string, a ...interface{}) string
if !util.InTerminal() || correctLeader {
statePrinter = fmt.Sprintf
} else if !correctLeader {
statePrinter = color.New(color.FgCyan).SprintfFunc()
}

leaderStateString := fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID)
if !correctLeader {
leaderStateString = fmt.Sprintf("%d %+v", partitionStatusInfo.Partition.Leader.ID,
statePrinter("(%s)", string(partitionStatusInfo.LeaderState)),
)
}

topicTableRows = append(topicTableRows, []string{
fmt.Sprintf("%s", topicName),
fmt.Sprintf("%d", partitionStatusInfo.Partition.ID),
leaderStateString,
fmt.Sprintf("%+v", partitionIsrs),
fmt.Sprintf("%+v", partitionReplicas),
fmt.Sprintf("%d", len(distinctRacks)),
fmt.Sprintf("%+v", racks),
fmt.Sprintf("%v", statusPrinter("%s", string(partitionStatusInfo.Status))),
})
}

tableData[topicName] = topicTableRows
topicNames = append(topicNames, topicName)
}

sort.Strings(topicNames)
for _, topicName := range topicNames {
_, exists := tableData[topicName]
if exists {
for _, topicTableRow := range tableData[topicName] {
table.Append(topicTableRow)
}
}
}

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatConfig creates a pretty table with all of the keys and values in a topic or
// broker config.
func FormatConfig(configMap map[string]string) string {
Expand Down
Loading

0 comments on commit 01c619c

Please sign in to comment.