Skip to content

Commit

Permalink
feat(storage): GCS backend using thanos.io/objstore (#11132)
Browse files Browse the repository at this point in the history
Signed-off-by: Joao Marcal <[email protected]>
Signed-off-by: Kaviraj <[email protected]>
Signed-off-by: Kaviraj Kanagaraj <[email protected]>
Signed-off-by: Ashwanth Goli <[email protected]>
Co-authored-by: Kaviraj Kanagaraj <[email protected]>
Co-authored-by: Ashwanth <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2024
1 parent 71d1c6b commit c059ace
Show file tree
Hide file tree
Showing 176 changed files with 5,481 additions and 2,694 deletions.
57 changes: 57 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1789,6 +1789,17 @@ storage:
# CLI flag: -common.storage.congestion-control.hedge.strategy
[strategy: <string> | default = ""]
object_store:
# The gcs_storage_backend block configures the connection to Google Cloud
# Storage object storage backend.
# The CLI flags prefix for this block configuration is: common.storage
[gcs: <gcs_storage_backend>]
# Prefix for all objects stored in the backend storage. For simplicity, it
# may only contain digits and English alphabet letters.
# CLI flag: -common.storage.object-store.storage-prefix
[storage_prefix: <string> | default = ""]
[persist_tokens: <boolean>]
[replication_factor: <int>]
Expand Down Expand Up @@ -2554,6 +2565,35 @@ The `frontend_worker` configures the worker - running within the Loki querier -
[query_scheduler_grpc_client: <grpc_client>]
```

### gcs_storage_backend

The `gcs_storage_backend` block configures the connection to Google Cloud Storage object storage backend.

```yaml
# GCS bucket name
# CLI flag: -<prefix>.object-store.gcs.bucket-name
[bucket_name: <string> | default = ""]
# JSON either from a Google Developers Console client_credentials.json file, or
# a Google Developers service account key. Needs to be valid JSON, not a
# filesystem path. If empty, fallback to Google default logic:
# 1. A JSON file whose path is specified by the GOOGLE_APPLICATION_CREDENTIALS
# environment variable. For workload identity federation, refer to
# https://cloud.google.com/iam/docs/how-to#using-workload-identity-federation on
# how to generate the JSON configuration file for on-prem/non-Google cloud
# platforms.
# 2. A JSON file in a location known to the gcloud command-line tool:
# $HOME/.config/gcloud/application_default_credentials.json.
# 3. On Google Compute Engine it fetches credentials from the metadata server.
# CLI flag: -<prefix>.object-store.gcs.service-account
[service_account: <string> | default = ""]
# The maximum size of the buffer that GCS client for a single PUT request. 0 to
# disable buffering.
# CLI flag: -<prefix>.object-store.gcs.chunk-buffer-size
[chunk_buffer_size: <int> | default = 0]
```

### gcs_storage_config

The `gcs_storage_config` block configures the connection to Google Cloud Storage object storage backend. The supported CLI flags `<prefix>` used to reference this configuration block are:
Expand Down Expand Up @@ -5646,6 +5686,23 @@ congestion_control:
# CLI flag: -store.max-parallel-get-chunk
[max_parallel_get_chunk: <int> | default = 150]

# Enables the use of thanos-io/objstore clients for connecting to object
# storage. When set to true, the configuration inside
# `storage_config.object_store` or `common.storage.object_store` block takes
# effect.
# CLI flag: -use-thanos-objstore
[use_thanos_objstore: <boolean> | default = false]

object_store:
# The gcs_storage_backend block configures the connection to Google Cloud
# Storage object storage backend.
[gcs: <gcs_storage_backend>]

# Prefix for all objects stored in the backend storage. For simplicity, it may
# only contain digits and English alphabet letters.
# CLI flag: -object-store.storage-prefix
[storage_prefix: <string> | default = ""]

# The maximum number of chunks to fetch per batch.
# CLI flag: -store.max-chunk-batch-size
[max_chunk_batch_size: <int> | default = 50]
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ require (
golang.org/x/sys v0.25.0
golang.org/x/time v0.6.0
google.golang.org/api v0.193.0
google.golang.org/grpc v1.65.0
google.golang.org/grpc v1.66.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -139,7 +139,7 @@ require (
github.com/richardartoul/molecule v1.0.0
github.com/schollz/progressbar/v3 v3.14.6
github.com/shirou/gopsutil/v4 v4.24.8
github.com/thanos-io/objstore v0.0.0-20240818203309-0363dadfdfb1
github.com/thanos-io/objstore v0.0.0-20241015070247-5f04b8b0b52a
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/franz-go/pkg/kfake v0.0.0-20240821035758-b77dd13e2bfa
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1795,8 +1795,8 @@ github.com/tedsuo/ifrit v0.0.0-20191009134036-9a97d0632f00/go.mod h1:eyZnKCc955u
github.com/tencentcloud/tencentcloud-sdk-go v1.0.162/go.mod h1:asUz5BPXxgoPGaRgZaVm1iGcUAuHyYUo1nXqKa83cvI=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
github.com/thanos-io/objstore v0.0.0-20240818203309-0363dadfdfb1 h1:z0v9BB/p7s4J6R//+0a5M3wCld8KzNjrGRLIwXfrAZk=
github.com/thanos-io/objstore v0.0.0-20240818203309-0363dadfdfb1/go.mod h1:3ukSkG4rIRUGkKM4oIz+BSuUx2e3RlQVVv3Cc3W+Tv4=
github.com/thanos-io/objstore v0.0.0-20241015070247-5f04b8b0b52a h1:0etzAoXPjVVUnscliA+xy8vWdE88jbvhcVMr1rVHc60=
github.com/thanos-io/objstore v0.0.0-20241015070247-5f04b8b0b52a/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4=
github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down Expand Up @@ -2659,8 +2659,8 @@ google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACu
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func New(
builderID := uuid.NewString()
logger = log.With(logger, "builder_id", builderID)

tsdbStore, err := common.NewTSDBStores(schemaCfg, storeCfg, storageMetrics, logger)
tsdbStore, err := common.NewTSDBStores("bloom-builder", schemaCfg, storeCfg, storageMetrics, logger)
if err != nil {
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type TSDBStores struct {
}

func NewTSDBStores(
component string,
schemaCfg config.SchemaConfig,
storeCfg baseStore.Config,
clientMetrics baseStore.ClientMetrics,
Expand All @@ -185,8 +186,7 @@ func NewTSDBStores(

for i, cfg := range schemaCfg.Configs {
if cfg.IndexType == types.TSDBType {

c, err := baseStore.NewObjectClient(cfg.ObjectType, storeCfg, clientMetrics)
c, err := baseStore.NewObjectClient(cfg.ObjectType, component, storeCfg, clientMetrics)
if err != nil {
return nil, errors.Wrap(err, "failed to create object client")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func New(
) (*Planner, error) {
utillog.WarnExperimentalUse("Bloom Planner", logger)

tsdbStore, err := common.NewTSDBStores(schemaCfg, storeCfg, storageMetrics, logger)
tsdbStore, err := common.NewTSDBStores("bloom-planner", schemaCfg, storeCfg, storageMetrics, logger)
if err != nil {
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ func makeWriteRequestWithLabelsWithLevel(lines, size int, labels []string, level

for j := 0; j < lines; j++ {
// Construct the log line, honoring the input size
line := "msg=an error occured " + strconv.Itoa(j) + strings.Repeat("0", size) + " severity=" + level
line := "msg=an error occurred " + strconv.Itoa(j) + strings.Repeat("0", size) + " severity=" + level

stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Now().Add(time.Duration(j) * time.Millisecond),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester-rf1/objstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func New(
return periodicConfigs[i].From.Time.Before(periodicConfigs[j].From.Time)
})
for _, periodicConfig := range periodicConfigs {
objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageConfig, clientMetrics)
objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, "storage-rf1", storageConfig, clientMetrics)
if err != nil {
return nil, fmt.Errorf("creating object client for period %s: %w ", periodicConfig.From, err)
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,15 +538,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
}

func GetObjectClient(store string, conf loki.Config, cm storage.ClientMetrics) (chunk.ObjectClient, error) {
oc, err := storage.NewObjectClient(
store,
conf.StorageConfig,
cm,
)
if err != nil {
return nil, err
}
return oc, nil
return storage.NewObjectClient(store, "logcli-query", conf.StorageConfig, cm)
}

var errNotExists = stdErrors.New("doesn't exist")
Expand Down
4 changes: 4 additions & 0 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/netutil"

"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/alibaba"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/aws"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/azure"
Expand Down Expand Up @@ -78,6 +79,7 @@ type Storage struct {
Hedging hedging.Config `yaml:"hedging"`
COS ibmcloud.COSConfig `yaml:"cos"`
CongestionControl congestion.Config `yaml:"congestion_control,omitempty"`
ObjectStore bucket.Config `yaml:"object_store"`
}

func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -91,6 +93,8 @@ func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
s.Hedging.RegisterFlagsWithPrefix(prefix, f)
s.COS.RegisterFlagsWithPrefix(prefix, f)
s.CongestionControl.RegisterFlagsWithPrefix(prefix, f)

s.ObjectStore.RegisterFlagsWithPrefix(prefix, f)
}

type FilesystemConfig struct {
Expand Down
6 changes: 6 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,12 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
}
}

if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore) {
applyConfig = func(r *ConfigWrapper) {
r.StorageConfig.ObjectStore = r.Common.Storage.ObjectStore
}
}

if configsFound > 1 {
return ErrTooManyStorageConfigs
}
Expand Down
41 changes: 39 additions & 2 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ ruler:
})

t.Run("explicit storage config provided via config file is preserved", func(t *testing.T) {
specificRulerConfig := `common:
explicitStorageConfig := `common:
storage:
gcs:
bucket_name: foobar
Expand All @@ -749,7 +749,7 @@ storage_config:
access_key_id: abc123
secret_access_key: def789`

config, defaults := testContext(specificRulerConfig, nil)
config, defaults := testContext(explicitStorageConfig, nil)

assert.Equal(t, "s3://foo-bucket", config.StorageConfig.AWSStorageConfig.S3Config.Endpoint)
assert.Equal(t, "us-east1", config.StorageConfig.AWSStorageConfig.S3Config.Region)
Expand All @@ -765,6 +765,43 @@ storage_config:
assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3)
})

t.Run("when common object_store config is provided, storage_config and rulers should use it", func(t *testing.T) {
commonStorageConfig := `common:
storage:
object_store:
gcs:
bucket_name: foobar
chunk_buffer_size: 17`

config, _ := testContext(commonStorageConfig, nil)

assert.Equal(t, "foobar", config.StorageConfig.ObjectStore.GCS.BucketName)
assert.Equal(t, 17, config.StorageConfig.ObjectStore.GCS.ChunkBufferSize)

// TODO: common config should be set on ruler bucket config
})

t.Run("explicit thanos object storage config provided via config file is preserved", func(t *testing.T) {
explicitStorageConfig := `common:
storage:
object_store:
gcs:
bucket_name: foobar
chunk_buffer_size: 17
storage_config:
object_store:
gcs:
bucket_name: barfoo
chunk_buffer_size: 27`

config, _ := testContext(explicitStorageConfig, nil)

assert.Equal(t, "barfoo", config.StorageConfig.ObjectStore.GCS.BucketName)
assert.Equal(t, 27, config.StorageConfig.ObjectStore.GCS.ChunkBufferSize)

// TODO: common config should be set on ruler bucket config
})

t.Run("named storage config provided via config file is preserved", func(t *testing.T) {
namedStoresConfig := `common:
storage:
Expand Down
12 changes: 6 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer)

tableClient, err := storage.NewTableClient(lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.ClientMetrics, reg, util_log.Logger)
tableClient, err := storage.NewTableClient(lastConfig.IndexType, "table-manager", *lastConfig, t.Cfg.StorageConfig, t.ClientMetrics, reg, util_log.Logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1419,7 +1418,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
continue
}

objectClient, err := storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
objectClient, err := storage.NewObjectClient(periodConfig.ObjectType, "compactor", t.Cfg.StorageConfig, t.ClientMetrics)
if err != nil {
return nil, fmt.Errorf("failed to create object client: %w", err)
}
Expand All @@ -1430,7 +1429,8 @@ func (t *Loki) initCompactor() (services.Service, error) {
var deleteRequestStoreClient client.ObjectClient
if t.Cfg.CompactorConfig.RetentionEnabled {
if deleteStore := t.Cfg.CompactorConfig.DeleteRequestStore; deleteStore != "" {
if deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.ClientMetrics); err != nil {
deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, "delete-store", t.Cfg.StorageConfig, t.ClientMetrics)
if err != nil {
return nil, fmt.Errorf("failed to create delete request store object client: %w", err)
}
} else {
Expand Down Expand Up @@ -1495,7 +1495,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}
tableRange := period.GetIndexTableNumberRange(periodEndTime)

indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, shardingStrategy,
indexClient, err := storage.NewIndexClient("index-store", period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, shardingStrategy,
prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())), t.Cfg.MetricsNamespace,
)
if err != nil {
Expand Down Expand Up @@ -1741,7 +1741,7 @@ func (t *Loki) initAnalytics() (services.Service, error) {
return nil, err
}

objectClient, err := storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
objectClient, err := storage.NewObjectClient(period.ObjectType, "analytics", t.Cfg.StorageConfig, t.ClientMetrics)
if err != nil {
level.Info(util_log.Logger).Log("msg", "failed to initialize usage report", "err", err)
return nil, nil
Expand Down
12 changes: 3 additions & 9 deletions pkg/ruler/base/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetr
}

// NewRuleStore returns a rule store backend client based on the provided cfg.
func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, loader promRules.GroupLoader, logger log.Logger, reg prometheus.Registerer) (rulestore.RuleStore, error) {
func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, loader promRules.GroupLoader, logger log.Logger, _ prometheus.Registerer) (rulestore.RuleStore, error) {
if cfg.Backend == configdb.Name {
c, err := configClient.New(cfg.ConfigDB)
if err != nil {
Expand All @@ -136,16 +136,10 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.
if cfg.Backend == local.Name {
return local.NewLocalRulesClient(cfg.Local, loader)
}

bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger, reg)
if err != nil {
return nil, err
}

store := bucketclient.NewBucketRuleStore(bucketClient, cfgProvider, logger)
bucketClient, err := bucket.NewClient(ctx, cfg.Backend, cfg.Config, "ruler-storage", logger)
if err != nil {
return nil, err
}

return store, nil
return bucketclient.NewBucketRuleStore(bucketClient, cfgProvider, logger), nil
}
2 changes: 2 additions & 0 deletions pkg/ruler/rulestore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// Config configures a rule store.
type Config struct {
bucket.Config `yaml:",inline"`
Backend string `yaml:"backend"`
ConfigDB client.Config `yaml:"configdb"`
Local local.Config `yaml:"local"`
}
Expand All @@ -26,6 +27,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.ExtraBackends = []string{configdb.Name, local.Name}
cfg.ConfigDB.RegisterFlagsWithPrefix(prefix, f)
cfg.Local.RegisterFlagsWithPrefix(prefix, f)
f.StringVar(&cfg.Backend, prefix+"backend", "filesystem", "Backend storage to use. Supported backends are: s3, gcs, azure, swift, filesystem.")
cfg.RegisterFlagsWithPrefix(prefix, f)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke
return nil, err
}

return azure.NewBucket(logger, serialized, name)
return azure.NewBucket(logger, serialized, name, nil)
}
Loading

0 comments on commit c059ace

Please sign in to comment.