Skip to content

Commit

Permalink
Merge pull request SeaseLtd#126 from mattflax/GH91_platform_checks
Browse files Browse the repository at this point in the history
Add check for platform availability before evaluating queries
  • Loading branch information
agazzarini authored Nov 13, 2020
2 parents 49dcf38 + c90e3a0 commit 02b4275
Show file tree
Hide file tree
Showing 20 changed files with 1,028 additions and 299 deletions.
180 changes: 104 additions & 76 deletions rre-core/src/main/java/io/sease/rre/core/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.sease.rre.persistence.PersistenceHandler;
import io.sease.rre.persistence.PersistenceManager;
import io.sease.rre.search.api.SearchPlatform;
import io.sease.rre.search.api.SearchPlatformException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -47,9 +48,11 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.zip.ZipEntry;
Expand Down Expand Up @@ -243,68 +246,10 @@ public Evaluation evaluate(final Map<String, Object> configuration) {

final Evaluation evaluation = new Evaluation();

ratings().forEach(ratingsNode -> {
LOGGER.info("RRE: Ratings Set processing starts");

final String indexName =
requireNonNull(
ratingsNode.get(INDEX_NAME),
"WARNING!!! \"" + INDEX_NAME + "\" attribute not found!").asText();
final String idFieldName =
requireNonNull(
ratingsNode.get(ID_FIELD_NAME),
"WARNING!!! \"" + ID_FIELD_NAME + "\" attribute not found!")
.asText(DEFAULT_ID_FIELD_NAME);

final Optional<File> data = data(ratingsNode);
final String queryPlaceholder = ofNullable(ratingsNode.get("query_placeholder")).map(JsonNode::asText).orElse("$query");

LOGGER.info("");
LOGGER.info("*********************************");
LOGGER.info("RRE: Index name => " + indexName);
LOGGER.info("RRE: ID Field name => " + idFieldName);

data.ifPresent(file -> LOGGER.info("RRE: Test Collection => " + file.getAbsolutePath()));
prepareData(indexName, data.orElse(null));

final Corpus corpus = evaluation.findOrCreate(data.map(File::getName).orElse(indexName), Corpus::new);
all(ratingsNode, TOPICS)
.forEach(topicNode -> {
final Topic topic = corpus.findOrCreate(name(topicNode), Topic::new);

LOGGER.info("TOPIC: " + topic.getName());

all(topicNode, QUERY_GROUPS)
.forEach(groupNode -> {
final QueryGroup group = topic.findOrCreate(name(groupNode), QueryGroup::new);

LOGGER.info("\tQUERY GROUP: " + group.getName());

final String sharedTemplate = ofNullable(groupNode.get("template")).map(JsonNode::asText).orElse(null);
all(groupNode, QUERIES)
.forEach(queryNode -> {
final String queryString = queryNode.findValue(queryPlaceholder).asText();

LOGGER.info("\t\tQUERY: " + queryString);

final JsonNode relevantDocuments = relevantDocuments(
Optional.ofNullable(queryNode.get(RELEVANT_DOCUMENTS))
.orElse(groupNode.get(RELEVANT_DOCUMENTS)));
final Query queryEvaluation = group.findOrCreate(queryString, Query::new);
queryEvaluation.setIdFieldName(idFieldName);
queryEvaluation.setRelevantDocuments(relevantDocuments);

List<Metric> metrics = availableMetrics(idFieldName, relevantDocuments,
new ArrayList<>(versionManager.getConfigurationVersions()));
queryEvaluation.prepare(metrics);

evaluationManager.evaluateQuery(queryEvaluation, indexName, queryNode, sharedTemplate,
Math.max(relevantDocuments.size(), minimumRequiredResults(metrics)));
});
});
});
});
// Start the evaluation process for all of the ratings nodes
ratings().forEach(ratingsNode -> evaluateRatings(evaluation, ratingsNode));

// Wait for the evaluations to complete
while (evaluationManager.isRunning()) {
LOGGER.info(" ... completed {} / {} evaluations ...",
(evaluationManager.getTotalQueries() - evaluationManager.getQueriesRemaining()),
Expand All @@ -314,7 +259,12 @@ public Evaluation evaluate(final Map<String, Object> configuration) {
} catch (InterruptedException ignore) {
}
}
LOGGER.info(" ... completed all {} evaluations.", evaluationManager.getTotalQueries());

if (evaluationManager.getTotalQueries() > 0) {
LOGGER.info(" ... completed all {} evaluations.", evaluationManager.getTotalQueries());
} else {
LOGGER.warn(" ... no queries evaluated!");
}

return evaluation;
} finally {
Expand All @@ -327,6 +277,80 @@ public Evaluation evaluate(final Map<String, Object> configuration) {
}
}

/**
* Evaluate a single ratings set, updating the evaluation with the results.
*
* @param evaluation the evaluation holding the query results.
* @param ratingsNode the contents of the ratings set.
*/
private void evaluateRatings(Evaluation evaluation, JsonNode ratingsNode) {
LOGGER.info("RRE: Ratings Set processing starts");

final String indexName =
requireNonNull(
ratingsNode.get(INDEX_NAME),
"WARNING!!! \"" + INDEX_NAME + "\" attribute not found!").asText();
final String idFieldName =
requireNonNull(
ratingsNode.get(ID_FIELD_NAME),
"WARNING!!! \"" + ID_FIELD_NAME + "\" attribute not found!")
.asText(DEFAULT_ID_FIELD_NAME);

final Optional<File> data = data(ratingsNode);
final String queryPlaceholder = ofNullable(ratingsNode.get("query_placeholder")).map(JsonNode::asText).orElse("$query");

LOGGER.info("");
LOGGER.info("*********************************");
LOGGER.info("RRE: Index name => " + indexName);
LOGGER.info("RRE: ID Field name => " + idFieldName);
data.ifPresent(file -> LOGGER.info("RRE: Test Collection => " + file.getAbsolutePath()));

try {
// Load the data. If the collection being loaded cannot be reached,
// this will fail.
prepareData(indexName, data.orElse(null));

final Corpus corpus = evaluation.findOrCreate(data.map(File::getName).orElse(indexName), Corpus::new);
all(ratingsNode, TOPICS)
.forEach(topicNode -> {
final Topic topic = corpus.findOrCreate(name(topicNode), Topic::new);

LOGGER.info("TOPIC: " + topic.getName());

all(topicNode, QUERY_GROUPS)
.forEach(groupNode -> {
final QueryGroup group = topic.findOrCreate(name(groupNode), QueryGroup::new);

LOGGER.info("\tQUERY GROUP: " + group.getName());

final String sharedTemplate = ofNullable(groupNode.get("template")).map(JsonNode::asText).orElse(null);
all(groupNode, QUERIES)
.forEach(queryNode -> {
final String queryString = queryNode.findValue(queryPlaceholder).asText();

LOGGER.info("\t\tQUERY: " + queryString);

final JsonNode relevantDocuments = relevantDocuments(
Optional.ofNullable(queryNode.get(RELEVANT_DOCUMENTS))
.orElse(groupNode.get(RELEVANT_DOCUMENTS)));
final Query queryEvaluation = group.findOrCreate(queryString, Query::new);
queryEvaluation.setIdFieldName(idFieldName);
queryEvaluation.setRelevantDocuments(relevantDocuments);

List<Metric> metrics = availableMetrics(idFieldName, relevantDocuments,
new ArrayList<>(versionManager.getConfigurationVersions()));
queryEvaluation.prepare(metrics);

evaluationManager.evaluateQuery(queryEvaluation, indexName, queryNode, sharedTemplate,
Math.max(relevantDocuments.size(), minimumRequiredResults(metrics)));
});
});
});
} catch (SearchPlatformException spe) {
LOGGER.error("SearchPlatform error while evaluating ratings: {}", spe.getMessage());
}
}

private Optional<File> data(final JsonNode ratingsNode) {
final File retFile;

Expand Down Expand Up @@ -480,28 +504,32 @@ private Stream<JsonNode> all(final JsonNode source, final String name) {
/**
* Prepares the search platform with the given index name and dataset.
*
* @param collection the index name.
* @param dataToBeIndexed the dataset.
* @param collection the index name.
* @param dataToBeIndexed the dataset.
* @throws SearchPlatformException if problems occur loading data to the
* search platform.
*/
private void prepareData(final String collection, final File dataToBeIndexed) {
private void prepareData(final String collection, final File dataToBeIndexed) throws SearchPlatformException {
if (dataToBeIndexed != null) {
LOGGER.info("Preparing dataToBeIndexed for " + collection + " from " + dataToBeIndexed.getAbsolutePath());
} else {
LOGGER.info("Preparing platform for " + collection);
}

Stream<File> searchCollectionsConfigs = versionManager.getConfigurationVersionFolders().stream()
.filter(versionFolder -> isConfigurationReloadNecessary(versionFolder))
.flatMap(versionFolder -> stream(safe(versionFolder.listFiles(ONLY_NON_HIDDEN_FILES))));
//each one of searchCollectionsConfigs stream elements is a full configuration for a search collection
searchCollectionsConfigs
Collection<File> configFiles = versionManager.getConfigurationVersionFolders().stream()
.filter(this::isConfigurationReloadNecessary)
.flatMap(versionFolder -> stream(safe(versionFolder.listFiles(ONLY_NON_HIDDEN_FILES))))
.filter(file -> platform.isSearchPlatformConfiguration(collection, file))
.sorted()
.peek(searchPlatformConfiguration -> LOGGER.info("RRE: Loading the Search Engine " + platform.getName() + ", configuration version " + searchPlatformConfiguration.getParentFile().getName()))
.forEach(searchPlatformConfiguration -> {
String version = searchPlatformConfiguration.getParentFile().getName();
platform.load(dataToBeIndexed, searchPlatformConfiguration, collection, version);
});
.collect(Collectors.toList());
for (File searchPlatformConfiguration : configFiles) {
LOGGER.info("RRE: Loading the Search Engine " + platform.getName() + ", configuration version " + searchPlatformConfiguration.getParentFile().getName());
String version = searchPlatformConfiguration.getParentFile().getName();
platform.load(dataToBeIndexed, searchPlatformConfiguration, collection, version);
if (!platform.checkCollection(collection, version)) {
throw new SearchPlatformException("Collection check failed for " + collection + " version " + version);
}
}

LOGGER.info("RRE: " + platform.getName() + " has been correctly loaded.");

Expand All @@ -510,7 +538,7 @@ private void prepareData(final String collection, final File dataToBeIndexed) {
LOGGER.info("RRE: target versions are " + String.join(",", versionManager.getConfigurationVersions()));
}

private boolean isConfigurationReloadNecessary( File versionFolder) {
private boolean isConfigurationReloadNecessary(File versionFolder) {
boolean corporaChanged = folderHasChanged(corporaFolder);
return folderHasChanged(versionFolder) || corporaChanged || platform.isRefreshRequired();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,9 @@ public boolean isCorporaRequired() {
@Override
public void close() {
}

@Override
public boolean checkCollection(String collection, String version) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public interface SearchPlatform extends Closeable {
default String getFullyQualifiedDomainName(final String indexName, final String version) {
return (indexName + "_" + version).toLowerCase();
}

/**
* Starts this search platform.
*/
Expand Down Expand Up @@ -119,4 +119,15 @@ default String getFullyQualifiedDomainName(final String indexName, final String
* loaded in order to run.
*/
boolean isCorporaRequired();

/**
* Check whether the collection is available on this search platform.
* Expects the platform to have been initialised (ie. started, data
* loaded, etc.)
*
* @param collection the name of the collection to check for.
* @param version the version of the collection to check for.
* @return {@code true} if this collection can be reached.
*/
boolean checkCollection(String collection, String version);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.sease.rre.search.api;

/**
* Exception class referencing a specific issue with a search platform
* implementation.
*
* @author Matt Pearce ([email protected])
*/
public class SearchPlatformException extends Exception {
public SearchPlatformException() {
}

public SearchPlatformException(String s) {
super(s);
}

public SearchPlatformException(String s, Throwable throwable) {
super(s, throwable);
}

public SearchPlatformException(Throwable throwable) {
super(throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,52 @@
<artifactId>rre-search-platform-elasticsearch-impl</artifactId>
<version>7.5.0</version>
<name>RRE - Elasticsearch platform binding</name>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>
<excludes>
<exclude>**/*IT.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<!-- Profile for running integration tests.
Requires Docker to be running on the test machine. -->
<id>integration</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>
<systemPropertyVariables>
<elasticsearch.version>${project.version}</elasticsearch.version>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>io.sease</groupId>
Expand Down Expand Up @@ -71,5 +117,23 @@
<artifactId>analysis-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-junit-rule</artifactId>
<version>5.11.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.15.0-rc2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 02b4275

Please sign in to comment.