Skip to content

Commit

Permalink
feat: cache amalthea sessions (#1983)
Browse files Browse the repository at this point in the history
* feat: cache amalthea sessions

Adapts the k8s watcher to cache the new amalthea sessions CRD.

---------

Co-authored-by: Flora Thiebaut <[email protected]>
  • Loading branch information
olevski and leafty authored Oct 10, 2024
1 parent e6461b7 commit 7f320d7
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 31 deletions.
4 changes: 2 additions & 2 deletions k8s-watcher/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM golang:1.19.0-alpine3.16 as builder
FROM golang:1.21-alpine3.20 as builder
COPY . /src
WORKDIR /src
RUN go build -o /k8s-watcher

FROM alpine:3
FROM alpine:3.20
COPY --from=builder /k8s-watcher /k8s-watcher
USER 1000:1000
ENTRYPOINT ["/k8s-watcher"]
18 changes: 16 additions & 2 deletions k8s-watcher/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func initializeK8sDynamicClient() (k8sDynamicClient dynamic.Interface, err error
return k8sDynamicClient, nil
}

// NewCacheFromConfig generates a new server cache from a configuration and a specfic k8s namespace.
func NewCacheFromConfig(ctx context.Context, config Config, namespace string) (res *Cache, err error) {
// NewJupyterServerCacheFromConfig generates a new server cache from a configuration and a specfic k8s namespace.
func NewJupyterServerCacheFromConfig(ctx context.Context, config Config, namespace string) (res *Cache, err error) {
k8sDynamicClient, err := initializeK8sDynamicClient()
if err != nil {
return
Expand All @@ -146,3 +146,17 @@ func NewCacheFromConfig(ctx context.Context, config Config, namespace string) (r
res = &Cache{informer: informer, lister: lister, namespace: namespace, userIDLabel: config.UserIDLabel}
return
}

// NewAmaltheaSessionCacheFromConfig generates a new server cache from a configuration and a specfic k8s namespace.
func NewAmaltheaSessionCacheFromConfig(ctx context.Context, config Config, namespace string) (res *Cache, err error) {
k8sDynamicClient, err := initializeK8sDynamicClient()
if err != nil {
return
}
resource := schema.GroupVersionResource{Group: config.AmaltheaSessionGroup, Version: config.AmaltheaSessionVersion, Resource: config.AmaltheaSessionPlural}
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k8sDynamicClient, time.Minute, namespace, nil)
informer := factory.ForResource(resource).Informer()
lister := factory.ForResource(resource).Lister()
res = &Cache{informer: informer, lister: lister, namespace: namespace, userIDLabel: config.UserIDLabel}
return
}
23 changes: 19 additions & 4 deletions k8s-watcher/cache_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c CacheCollection) synchronize(ctx context.Context, timeout time.Duration)
syncCount++
log.Printf("Synced %d/%d caches\n", syncCount, len(c))
case <-timeoutCh:
log.Fatalf("Syncing caches timed out after %d seconds\n.", timeout / time.Second)
log.Fatalf("Syncing caches timed out after %.f seconds\n.", timeout.Seconds())
}
}
log.Println("Synced all caches!")
Expand Down Expand Up @@ -135,13 +135,28 @@ func (c CacheCollection) getByName(name string) (res []runtime.Object, err error
return res, nil
}

// NewCacheCollectionFromConfigOrDie generates a new cache map from a configuration. If it cannot
// NewJupyterServerCacheCollectionFromConfigOrDie generates a new cache map from a configuration. If it cannot
// do this successfully it will terminate the program because the server cannot run at all if this
// step fails in any way and the program cannot recover from errors that occur here.
func NewCacheCollectionFromConfigOrDie(ctx context.Context, config Config) *CacheCollection {
func NewJupyterServerCacheCollectionFromConfigOrDie(ctx context.Context, config Config) *CacheCollection {
caches := CacheCollection{}
for _, namespace := range config.Namespaces {
cache, err := NewCacheFromConfig(ctx, config, namespace)
cache, err := NewJupyterServerCacheFromConfig(ctx, config, namespace)
if err != nil {
log.Fatalf("Cannot create cache collection: %v\n", err)
}
caches[namespace] = cache
}
return &caches
}

// NewAmaltheaSessionCacheCollectionFromConfigOrDie generates a new cache map from a configuration. If it cannot
// do this successfully it will terminate the program because the server cannot run at all if this
// step fails in any way and the program cannot recover from errors that occur here.
func NewAmaltheaSessionCacheCollectionFromConfigOrDie(ctx context.Context, config Config) *CacheCollection {
caches := CacheCollection{}
for _, namespace := range config.Namespaces {
cache, err := NewAmaltheaSessionCacheFromConfig(ctx, config, namespace)
if err != nil {
log.Fatalf("Cannot create cache collection: %v\n", err)
}
Expand Down
24 changes: 24 additions & 0 deletions k8s-watcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ type Config struct {
CrVersion string
// The plural name of the k8s resource that shoud be cached.
CrPlural string
// The group of the AmaltheaSession resource that shoud be cached.
AmaltheaSessionGroup string
// The version of the AmaltheaSession resource that shoud be cached.
AmaltheaSessionVersion string
// The plural name of the AmaltheaSession resource that shoud be cached.
AmaltheaSessionPlural string
// The port where the server will listen to for providing responses to requests
// about listing the cached resources or for returning specific resources.
Port int
Expand Down Expand Up @@ -66,6 +72,24 @@ func NewConfigFromEnvOrDie(prefix string) Config {
log.Fatalf("invalid configuration, %sCR_PLURAL must be provided", prefix)
}

if asGroup, ok := os.LookupEnv(fmt.Sprintf("%sAMALTHEA_SESSION_GROUP", prefix)); ok {
config.AmaltheaSessionGroup = asGroup
} else {
config.AmaltheaSessionGroup = "amalthea.dev"
}

if asVersion, ok := os.LookupEnv(fmt.Sprintf("%sAMALTHEA_SESSION_VERSION", prefix)); ok {
config.AmaltheaSessionVersion = asVersion
} else {
config.AmaltheaSessionVersion = "v1alpha1"
}

if asPlural, ok := os.LookupEnv(fmt.Sprintf("%sAMALTHEA_SESSION_PLURAL", prefix)); ok {
config.AmaltheaSessionPlural = asPlural
} else {
config.AmaltheaSessionPlural = "amaltheasessions"
}

if port, ok := os.LookupEnv(fmt.Sprintf("%sPORT", prefix)); ok {
portInt, err := strconv.Atoi(port)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion k8s-watcher/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/SwissDataScienceCenter/renku-notebooks/k8s-watcher

go 1.19
go 1.21

toolchain go1.21.11

require (
github.com/julienschmidt/httprouter v1.3.0
Expand Down
10 changes: 10 additions & 0 deletions k8s-watcher/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
Expand All @@ -14,6 +15,7 @@ github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand All @@ -28,6 +30,7 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
Expand All @@ -42,6 +45,7 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -56,11 +60,15 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg=
github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -71,6 +79,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down Expand Up @@ -109,6 +118,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
25 changes: 15 additions & 10 deletions k8s-watcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
// Server represents the http server and associated components that do cachcing
// of k8s resources.
type Server struct {
caches CacheCollection
config Config
router *httprouter.Router
cachesJS CacheCollection
cachesAS CacheCollection
config Config
router *httprouter.Router
*http.Server
}

Expand Down Expand Up @@ -46,8 +47,10 @@ func (s *Server) Initialize(ctx context.Context) {
log.Println("Initializing http server...")
s.registerRoutes()
s.Handler = s
go s.caches.run(ctx)
s.caches.synchronize(ctx, s.config.CacheSyncTimeout)
go s.cachesJS.run(ctx)
go s.cachesAS.run(ctx)
s.cachesJS.synchronize(ctx, s.config.CacheSyncTimeout)
s.cachesAS.synchronize(ctx, s.config.CacheSyncTimeout)
}

func (s *Server) respond(w http.ResponseWriter, req *http.Request, data interface{}, err error) {
Expand All @@ -67,13 +70,15 @@ func (s *Server) respond(w http.ResponseWriter, req *http.Request, data interfac

// NewServerFromConfigOrDie creates a new Server from a configuration or panics
func NewServerFromConfigOrDie(ctx context.Context, config Config) *Server {
cacheCollection := NewCacheCollectionFromConfigOrDie(ctx, config)
cacheCollectionJS := NewJupyterServerCacheCollectionFromConfigOrDie(ctx, config)
cacheCollectionAS := NewAmaltheaSessionCacheCollectionFromConfigOrDie(ctx, config)
return &Server{
config: config,
caches: *cacheCollection,
router: httprouter.New(),
config: config,
cachesJS: *cacheCollectionJS,
cachesAS: *cacheCollectionAS,
router: httprouter.New(),
Server: &http.Server{
Addr: fmt.Sprintf(":%d", config.Port),
Addr: fmt.Sprintf(":%d", config.Port),
},
}
}
53 changes: 41 additions & 12 deletions k8s-watcher/server_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,62 @@ import (

// routers registers the handlers for all http endpoints the server supports.
func (s *Server) registerRoutes() {
s.router.HandlerFunc("GET", "/servers", s.handleIndex)
s.router.HandlerFunc("GET", "/servers/:serverID", s.handleServerID)
s.router.HandlerFunc("GET", "/users/:userID/servers", s.handleUserID)
s.router.HandlerFunc("GET", "/users/:userID/servers/:serverID", s.handleUserIDServerID)
s.router.HandlerFunc("GET", "/health", s.handleHealthCheck)
// Used for the old amalthea operator in charge of jupyterservers custom resources
s.router.HandlerFunc("GET", "/servers", s.jsGetAll)
s.router.HandlerFunc("GET", "/servers/:serverID", s.jsGetOne)
s.router.HandlerFunc("GET", "/users/:userID/servers", s.jsUserID)
s.router.HandlerFunc("GET", "/users/:userID/servers/:serverID", s.jsUserIDServerID)
// Used for the new amalthea operator in charge of amaltheasessions custom resources
s.router.HandlerFunc("GET", "/sessions", s.asGetAll)
s.router.HandlerFunc("GET", "/sessions/:serverID", s.asGetOne)
s.router.HandlerFunc("GET", "/users/:userID/sessions", s.asUserID)
s.router.HandlerFunc("GET", "/users/:userID/sessions/:serverID", s.asUserIDServerID)
}

func (s *Server) handleIndex(w http.ResponseWriter, req *http.Request) {
output, err := s.caches.getAll()
func (s *Server) jsGetAll(w http.ResponseWriter, req *http.Request) {
output, err := s.cachesJS.getAll()
s.respond(w, req, output, err)
}

func (s *Server) handleServerID(w http.ResponseWriter, req *http.Request) {
func (s *Server) jsGetOne(w http.ResponseWriter, req *http.Request) {
params := httprouter.ParamsFromContext(req.Context())
output, err := s.caches.getByName(params.ByName("serverID"))
output, err := s.cachesJS.getByName(params.ByName("serverID"))
s.respond(w, req, output, err)
}

func (s *Server) handleUserID(w http.ResponseWriter, req *http.Request) {
func (s *Server) jsUserID(w http.ResponseWriter, req *http.Request) {
params := httprouter.ParamsFromContext(req.Context())
output, err := s.caches.getByUserID(params.ByName("userID"))
output, err := s.cachesJS.getByUserID(params.ByName("userID"))
s.respond(w, req, output, err)
}

func (s *Server) handleUserIDServerID(w http.ResponseWriter, req *http.Request) {
func (s *Server) jsUserIDServerID(w http.ResponseWriter, req *http.Request) {
params := httprouter.ParamsFromContext(req.Context())
output, err := s.caches.getByNameAndUserID(params.ByName("serverID"), params.ByName("userID"))
output, err := s.cachesJS.getByNameAndUserID(params.ByName("serverID"), params.ByName("userID"))
s.respond(w, req, output, err)
}

func (s *Server) asGetAll(w http.ResponseWriter, req *http.Request) {
output, err := s.cachesAS.getAll()
s.respond(w, req, output, err)
}

func (s *Server) asGetOne(w http.ResponseWriter, req *http.Request) {
params := httprouter.ParamsFromContext(req.Context())
output, err := s.cachesAS.getByName(params.ByName("serverID"))
s.respond(w, req, output, err)
}

func (s *Server) asUserID(w http.ResponseWriter, req *http.Request) {
params := httprouter.ParamsFromContext(req.Context())
output, err := s.cachesAS.getByUserID(params.ByName("userID"))
s.respond(w, req, output, err)
}

func (s *Server) asUserIDServerID(w http.ResponseWriter, req *http.Request) {
params := httprouter.ParamsFromContext(req.Context())
output, err := s.cachesAS.getByNameAndUserID(params.ByName("serverID"), params.ByName("userID"))
s.respond(w, req, output, err)
}

Expand Down

0 comments on commit 7f320d7

Please sign in to comment.