Skip to content

Commit

Permalink
All Resource Event Sources can handle multiple seconday resources for…
Browse files Browse the repository at this point in the history
… a primary resources (#1169)
  • Loading branch information
csviri authored Apr 22, 2022
1 parent dca90ed commit 3c3c5bd
Show file tree
Hide file tree
Showing 27 changed files with 517 additions and 335 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -10,7 +8,6 @@
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.MultiResourceOwner;

public class DefaultContext<P extends HasMetadata> implements Context<P> {

Expand All @@ -37,17 +34,8 @@ public Optional<RetryInfo> getRetryInfo() {
@SuppressWarnings("unchecked")
public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
.map(
es -> {
if (es instanceof MultiResourceOwner) {
return ((MultiResourceOwner<T, P>) es).getSecondaryResources(primaryResource);
} else {
return es.getSecondaryResource(primaryResource)
.map(List::of)
.orElse(Collections.emptyList());
}
})
.flatMap(List::stream)
.map(es -> es.getSecondaryResources(primaryResource))
.flatMap(Set::stream)
.collect(Collectors.toSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public interface RecentOperationCacheFiller<R> {

void handleRecentResourceCreate(ResourceID resourceID, R resource);

void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousResourceVersion);
void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousVersionOfResource);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.dependent.AbstractEventSourceHolderDependentResource;
import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;

public abstract class AbstractCachingDependentResource<R, P extends HasMetadata>
extends
Expand All @@ -15,8 +15,6 @@ protected AbstractCachingDependentResource(Class<R> resourceType) {
this.resourceType = resourceType;
}

public abstract Optional<R> fetchResource(P primaryResource);

@Override
public Class<R> resourceType() {
return resourceType;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.javaoperatorsdk.operator.processing.dependent.external;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;

public abstract class AbstractPollingDependentResource<R, P extends HasMetadata>
extends AbstractCachingDependentResource<R, P> {
extends AbstractCachingDependentResource<R, P> implements CacheKeyMapper<R> {

public static final int DEFAULT_POLLING_PERIOD = 5000;
private long pollingPeriod;
Expand All @@ -24,4 +25,10 @@ public void setPollingPeriod(long pollingPeriod) {
public long getPollingPeriod() {
return pollingPeriod;
}

// for now dependent resources support event sources only with one owned resource.
@Override
public String keyFor(R resource) {
return CacheKeyMapper.singleResourceCacheKeyMapper().keyFor(resource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;

public abstract class PerResourcePollingDependentResource<R, P extends HasMetadata>
extends AbstractPollingDependentResource<R, P>
implements PerResourcePollingEventSource.ResourceFetcher<R, P> {


public PerResourcePollingDependentResource(Class<R> resourceType) {
super(resourceType);
}
Expand All @@ -20,6 +22,7 @@ public PerResourcePollingDependentResource(Class<R> resourceType, long pollingPe
protected ExternalResourceCachingEventSource<R, P> createEventSource(
EventSourceContext<P> context) {
return new PerResourcePollingEventSource<>(this, context.getPrimaryCache(),
getPollingPeriod(), resourceType());
getPollingPeriod(), resourceType(), this);
}

}
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
package io.javaoperatorsdk.operator.processing.dependent.external;

import java.util.Map;
import java.util.function.Supplier;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.source.polling.PollingEventSource;

public abstract class PollingDependentResource<R, P extends HasMetadata>
extends AbstractPollingDependentResource<R, P> implements Supplier<Map<ResourceID, R>> {
extends AbstractPollingDependentResource<R, P>
implements PollingEventSource.GenericResourceFetcher<R> {

private final CacheKeyMapper<R> cacheKeyMapper;

public PollingDependentResource(Class<R> resourceType) {
public PollingDependentResource(Class<R> resourceType, CacheKeyMapper<R> cacheKeyMapper) {
super(resourceType);
this.cacheKeyMapper = cacheKeyMapper;
}

public PollingDependentResource(Class<R> resourceType, long pollingPeriod) {
public PollingDependentResource(Class<R> resourceType, long pollingPeriod,
CacheKeyMapper<R> cacheKeyMapper) {
super(resourceType, pollingPeriod);
this.cacheKeyMapper = cacheKeyMapper;
}

@Override
protected ExternalResourceCachingEventSource<R, P> createEventSource(
EventSourceContext<P> context) {
return new PollingEventSource<>(this, getPollingPeriod(), resourceType());
return new PollingEventSource<>(this, getPollingPeriod(), resourceType(), cacheKeyMapper);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.javaoperatorsdk.operator.processing.event.EventHandler;

public abstract class AbstractEventSource implements EventSource {

private EventHandler handler;
private volatile boolean running = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;

public abstract class AbstractResourceEventSource<P extends HasMetadata, R>
public abstract class AbstractResourceEventSource<R, P extends HasMetadata>
extends AbstractEventSource
implements ResourceEventSource<R, P> {
private final Class<R> resourceClass;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.javaoperatorsdk.operator.processing.event.source;

public interface CacheKeyMapper<R> {

String keyFor(R resource);

/**
* Used if a polling event source handles only single secondary resource. See also docs for:
* {@link ExternalResourceCachingEventSource}
*
* @return static id mapper, all resources are mapped for same id.
* @param <T> secondary resource type
*/
static <T> CacheKeyMapper<T> singleResourceCacheKeyMapper() {
return r -> "id";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* @param <R> represents the type of resources (usually external non-kubernetes ones) being handled.
*/
public abstract class CachingEventSource<R, P extends HasMetadata>
extends AbstractResourceEventSource<P, R> implements Cache<R> {
extends AbstractResourceEventSource<R, P> implements Cache<R> {

protected UpdatableCache<R> cache;

Expand Down Expand Up @@ -43,12 +43,9 @@ public Stream<R> list(Predicate<R> predicate) {
return cache.list(predicate);
}

protected UpdatableCache<R> initCache() {
return new ConcurrentHashMapCache<>();
}

public Optional<R> getCachedValue(ResourceID resourceID) {
return cache.get(resourceID);
}

protected abstract UpdatableCache<R> initCache();
}
Loading

0 comments on commit 3c3c5bd

Please sign in to comment.