Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist patterns #394

Merged
merged 13 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions src/main/java/com/conveyal/gtfs/GTFSFeed.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ else if (feedId == null || feedId.isEmpty()) {
this.fares.putAll(fares);
fares = null; // free memory

new Pattern.Loader(this).loadTable(zip);
new Route.Loader(this).loadTable(zip);
new ShapePoint.Loader(this).loadTable(zip);
new Stop.Loader(this).loadTable(zip);
Expand Down Expand Up @@ -216,6 +217,7 @@ public void toFile (String file) {
new Transfer.Writer(this).writeTable(zip);
new Trip.Writer(this).writeTable(zip);
new StopTime.Writer(this).writeTable(zip);
new Pattern.Writer(this).writeTable(zip);

zip.close();

Expand Down Expand Up @@ -343,27 +345,6 @@ public Shape getShape (String shape_id) {
return shape.shape_dist_traveled.length > 0 ? shape : null;
}

/**
* MapDB-based implementation to find patterns.
*
* FIXME: Remove and make pattern finding happen during validation? We want to share the pattern finder between the
* two implementations (MapDB and RDBMS), apply the same validation process to both kinds of storage, and produce
* Patterns in the same way in both cases, during validation. This prevents us from iterating over every stopTime
* twice, since we're already iterating over all of them in validation. However, in this case it might not be costly
* to simply retrieve the stop times from the stop_times map.
*/
public void findPatterns () {
PatternFinder patternFinder = new PatternFinder();
// Iterate over trips and process each trip and its stop times.
for (Trip trip : this.trips.values()) {
Iterable<StopTime> orderedStopTimesForTrip = this.getOrderedStopTimesForTrip(trip.trip_id);
patternFinder.processTrip(trip, orderedStopTimesForTrip);
}
Map<TripPatternKey, Pattern> patternObjects = patternFinder.createPatternObjects(this.stops, null);
this.patterns.putAll(patternObjects.values().stream()
.collect(Collectors.toMap(Pattern::getId, pattern -> pattern)));
}

/**
* For the given trip ID, fetch all the stop times in order, and interpolate stop-to-stop travel times.
*/
Expand Down
242 changes: 242 additions & 0 deletions src/main/java/com/conveyal/gtfs/PatternBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package com.conveyal.gtfs;

import com.conveyal.gtfs.loader.BatchTracker;
import com.conveyal.gtfs.loader.Feed;
import com.conveyal.gtfs.loader.Requirement;
import com.conveyal.gtfs.loader.Table;
import com.conveyal.gtfs.model.Pattern;
import com.conveyal.gtfs.model.PatternStop;
import org.apache.commons.dbutils.DbUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Set;

import static com.conveyal.gtfs.loader.JdbcGtfsLoader.copyFromFile;
import static com.conveyal.gtfs.model.Entity.INT_MISSING;
import static com.conveyal.gtfs.model.Entity.setDoubleParameter;
import static com.conveyal.gtfs.model.Entity.setIntParameter;

public class PatternBuilder {

private static final Logger LOG = LoggerFactory.getLogger(PatternBuilder.class);

private final Feed feed;
private static final String TEMP_FILE_NAME = "pattern_for_trips";

private final Connection connection;
public PatternBuilder(Feed feed) throws SQLException {
this.feed = feed;
connection = feed.getConnection();
}

private String getTableName(String tableName) {
return feed.tablePrefix + tableName;
}
br648 marked this conversation as resolved.
Show resolved Hide resolved

public void create(Map<TripPatternKey, Pattern> patterns, Set<String> patternIdsLoadedFromFile) {
String patternsTableName = getTableName("patterns");
String tripsTableName = getTableName("trips");
String patternStopsTableName = getTableName("pattern_stops");

Table patternsTable = new Table(patternsTableName, Pattern.class, Requirement.EDITOR, Table.PATTERNS.fields);
Table patternStopsTable = new Table(patternStopsTableName, PatternStop.class, Requirement.EDITOR, Table.PATTERN_STOP.fields);

try {
File tempPatternForTripsTextFile = File.createTempFile(TEMP_FILE_NAME, "text");
LOG.info("Creating pattern and pattern stops tables.");
Statement statement = connection.createStatement();
statement.execute(String.format("alter table %s add column pattern_id varchar", tripsTableName));
br648 marked this conversation as resolved.
Show resolved Hide resolved
if (patternIdsLoadedFromFile.isEmpty()) {
// No patterns were loaded from file so the pattern table has not previously been created.
patternsTable.createSqlTable(connection, null, true);
}
patternStopsTable.createSqlTable(connection, null, true);
PrintStream patternForTripsFileStream = createTempPatternForTripsTable(tempPatternForTripsTextFile, statement);
processPatternAndPatternStops(patternsTable, patternStopsTable, patternForTripsFileStream, patterns, patternIdsLoadedFromFile);
updateTripPatternIds(tempPatternForTripsTextFile, patternForTripsFileStream, statement, tripsTableName);
createIndexes(statement, patternsTableName, patternStopsTableName, tripsTableName);
connection.commit();
} catch (SQLException | IOException e) {
// Rollback transaction if failure occurs on creating patterns.
DbUtils.rollbackAndCloseQuietly(connection);
// This exception will be stored as a validator failure.
throw new RuntimeException(e);
} finally {
// Close transaction finally.
if (connection != null) DbUtils.closeQuietly(connection);
}
}

private void processPatternAndPatternStops(
Table patternsTable,
Table patternStopsTable,
PrintStream patternForTripsFileStream,
Map<TripPatternKey, Pattern> patterns,
Set<String> patternIdsLoadedFromFile
) throws SQLException {
// Generate prepared statements for inserts.
String insertPatternSql = patternsTable.generateInsertSql(true);
PreparedStatement insertPatternStatement = connection.prepareStatement(insertPatternSql);
BatchTracker patternTracker = new BatchTracker("pattern", insertPatternStatement);
LOG.info("Storing patterns and pattern stops.");
for (Map.Entry<TripPatternKey, Pattern> entry : patterns.entrySet()) {
Pattern pattern = entry.getValue();
LOG.debug("Batching pattern {}", pattern.pattern_id);
if (!patternIdsLoadedFromFile.contains(pattern.pattern_id)) {
// Only insert the pattern if it has not already been imported from file.
pattern.setStatementParameters(insertPatternStatement, true);
patternTracker.addBatch();
}
createPatternStops(entry.getKey(), pattern.pattern_id, patternStopsTable);
updateTripPatternReferences(patternForTripsFileStream, pattern);
}
// Send any remaining prepared statement calls to the database backend.
patternTracker.executeRemaining();
LOG.info("Done storing patterns and pattern stops.");
}

/**
* Create temp table for updating trips with pattern IDs to be dropped at the end of the transaction.
* NOTE: temp table name must NOT be prefixed with schema because temp tables are prefixed with their own
* connection-unique schema.
*/
private PrintStream createTempPatternForTripsTable(
File tempPatternForTripsTextFile,
Statement statement
) throws SQLException, IOException {
LOG.info("Loading via temporary text file at {}.", tempPatternForTripsTextFile.getAbsolutePath());
String createTempSql = String.format("create temp table %s(trip_id varchar, pattern_id varchar) on commit drop", TEMP_FILE_NAME);
LOG.info(createTempSql);
statement.execute(createTempSql);
return new PrintStream(new BufferedOutputStream(Files.newOutputStream(tempPatternForTripsTextFile.toPath())));
}

/**
* Update all trips on this pattern to reference this pattern's ID.
*/
private void updateTripPatternReferences(PrintStream patternForTripsFileStream, Pattern pattern) {
// Prepare each trip in pattern to update trips table.
for (String tripId : pattern.associatedTrips) {
// Add line to temp csv file if using postgres.
// No need to worry about null trip IDs because the trips have already been processed.
String[] strings = new String[]{tripId, pattern.pattern_id};
// Print a new line in the standard postgres text format:
// https://www.postgresql.org/docs/9.1/static/sql-copy.html#AEN64380
patternForTripsFileStream.println(String.join("\t", strings));
}
}

/**
* Copy the pattern for trips text file into a table, create an index on trip IDs, and update the trips
* table.
*/
private void updateTripPatternIds(
File tempPatternForTripsTextFile,
PrintStream patternForTripsFileStream,
Statement statement,
String tripsTableName
) throws SQLException, IOException {

br648 marked this conversation as resolved.
Show resolved Hide resolved
LOG.info("Updating trips with pattern IDs.");
patternForTripsFileStream.close();
br648 marked this conversation as resolved.
Show resolved Hide resolved
// Copy file contents into temp pattern for trips table.
copyFromFile(connection, tempPatternForTripsTextFile, TEMP_FILE_NAME);
// Before updating the trips with pattern IDs, index the table on trip_id.
String patternForTripsIndexSql = String.format(
"create index temp_trips_pattern_id_idx on %s (trip_id)",
TEMP_FILE_NAME
);
LOG.info(patternForTripsIndexSql);
statement.execute(patternForTripsIndexSql);
// Finally, execute the update statement.
String updateTripsSql = String.format(
"update %s set pattern_id = %s.pattern_id from %s where %s.trip_id = %s.trip_id",
br648 marked this conversation as resolved.
Show resolved Hide resolved
tripsTableName,
TEMP_FILE_NAME,
TEMP_FILE_NAME,
tripsTableName,
TEMP_FILE_NAME
);
LOG.info(updateTripsSql);
statement.executeUpdate(updateTripsSql);
// Delete temp file. Temp table will be dropped after the transaction is committed.
Files.delete(tempPatternForTripsTextFile.toPath());
LOG.info("Updating trips complete.");
}

private void createIndexes(
Statement statement,
String patternsTableName,
String patternStopsTableName,
String tripsTableName
) throws SQLException {
LOG.info("Creating index on patterns.");
statement.executeUpdate(String.format("alter table %s add primary key (pattern_id)", patternsTableName));
LOG.info("Creating index on pattern stops.");
statement.executeUpdate(String.format("alter table %s add primary key (pattern_id, stop_sequence)", patternStopsTableName));
// Index new pattern_id column on trips. The other tables are already indexed because they have primary keys.
LOG.info("Indexing trips on pattern id.");
statement.execute(String.format("create index trips_pattern_id_idx on %s (pattern_id)", tripsTableName));
LOG.info("Done indexing.");
}

/**
* Construct pattern stops based on values in trip pattern key.
*/
private void createPatternStops(TripPatternKey key, String patternId, Table patternStopsTable) throws SQLException {

String insertPatternStopSql = patternStopsTable.generateInsertSql(true);
PreparedStatement insertPatternStopStatement = connection.prepareStatement(insertPatternStopSql);
BatchTracker patternStopTracker = new BatchTracker("pattern stop", insertPatternStopStatement);

int lastValidDeparture = key.departureTimes.get(0);
for (int i = 0; i < key.stops.size(); i++) {
int travelTime = 0;
String stopId = key.stops.get(i);
int arrival = key.arrivalTimes.get(i);
if (i > 0) {
int prevDeparture = key.departureTimes.get(i - 1);
// Set travel time for all stops except the first.
if (prevDeparture != INT_MISSING) {
// Update the previous departure if it's not missing. Otherwise, base travel time based on the
// most recent valid departure.
lastValidDeparture = prevDeparture;
}
travelTime = arrival == INT_MISSING || lastValidDeparture == INT_MISSING
? INT_MISSING
: arrival - lastValidDeparture;
}
int departure = key.departureTimes.get(i);
int dwellTime = arrival == INT_MISSING || departure == INT_MISSING
? INT_MISSING
: departure - arrival;

insertPatternStopStatement.setString(1, patternId);
// Stop sequence is zero-based.
setIntParameter(insertPatternStopStatement, 2, i);
insertPatternStopStatement.setString(3, stopId);
insertPatternStopStatement.setString(4, key.stopHeadsigns.get(i));
setIntParameter(insertPatternStopStatement,5, travelTime);
setIntParameter(insertPatternStopStatement,6, dwellTime);
setIntParameter(insertPatternStopStatement,7, key.dropoffTypes.get(i));
setIntParameter(insertPatternStopStatement,8, key.pickupTypes.get(i));
setDoubleParameter(insertPatternStopStatement, 9, key.shapeDistances.get(i));
setIntParameter(insertPatternStopStatement,10, key.timepoints.get(i));
setIntParameter(insertPatternStopStatement,11, key.continuous_pickup.get(i));
setIntParameter(insertPatternStopStatement,12, key.continuous_drop_off.get(i));
patternStopTracker.addBatch();
}
patternStopTracker.executeRemaining();
}
}
24 changes: 1 addition & 23 deletions src/main/java/com/conveyal/gtfs/PatternFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,12 @@
import com.conveyal.gtfs.error.NewGTFSErrorType;
import com.conveyal.gtfs.error.SQLErrorStorage;
import com.conveyal.gtfs.model.Pattern;
import com.conveyal.gtfs.model.PatternStop;
import com.conveyal.gtfs.model.ShapePoint;
import com.conveyal.gtfs.model.Stop;
import com.conveyal.gtfs.model.StopTime;
import com.conveyal.gtfs.model.Trip;
import com.conveyal.gtfs.validator.service.GeoUtils;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.CoordinateList;
import org.locationtech.jts.geom.LineString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,7 +23,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.conveyal.gtfs.util.Util.human;

Expand All @@ -50,21 +43,6 @@ public class PatternFinder {

private int nTripsProcessed = 0;

/**
* Bin all trips by the sequence of stops they visit.
* @return A map from a list of stop IDs to a list of Trip IDs that visit those stops in that sequence.
*/
// public void findPatterns(Feed feed) {
//
// for (Trip trip : trips) {
// }
// feed.patterns.stream().forEach(p -> {
// feed.patterns.put(p.pattern_id, p);
// p.associatedTrips.stream().forEach(t -> feed.tripPatternMap.put(t, p.pattern_id));
// });
//
// }

public void processTrip(Trip trip, Iterable<StopTime> orderedStopTimes) {
if (++nTripsProcessed % 100000 == 0) {
LOG.info("trip {}", human(nTripsProcessed));
Expand Down Expand Up @@ -121,7 +99,7 @@ public Map<TripPatternKey, Pattern> createPatternObjects(Map<String, Stop> stopB
* This process requires access to all the stops in the feed.
* Some validators already cache a map of all the stops. There's probably a cleaner way to do this.
*/
public static void renamePatterns(Collection<Pattern> patterns, Map<String, Stop> stopById) {
private static void renamePatterns(Collection<Pattern> patterns, Map<String, Stop> stopById) {
LOG.info("Generating unique names for patterns");

Map<String, PatternNamingInfo> namingInfoForRoute = new HashMap<>();
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/conveyal/gtfs/loader/EntityPopulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.conveyal.gtfs.model.Entity;
import com.conveyal.gtfs.model.FareAttribute;
import com.conveyal.gtfs.model.Frequency;
import com.conveyal.gtfs.model.Pattern;
import com.conveyal.gtfs.model.PatternStop;
import com.conveyal.gtfs.model.Route;
import com.conveyal.gtfs.model.ScheduleException;
Expand Down Expand Up @@ -70,6 +71,17 @@ public interface EntityPopulator<T> {
return patternStop;
};

EntityPopulator<Pattern> PATTERN = (result, columnForName) -> {
Pattern pattern = new Pattern();
pattern.pattern_id = getStringIfPresent(result, "pattern_id", columnForName);
pattern.route_id = getStringIfPresent(result, "route_id", columnForName);
pattern.name = getStringIfPresent(result, "name", columnForName);
pattern.direction_id = getIntIfPresent(result, "direction_id", columnForName);
pattern.use_frequency = getIntIfPresent(result, "use_frequency", columnForName);
pattern.shape_id = getStringIfPresent(result, "shape_id", columnForName);
return pattern;
};

T populate (ResultSet results, TObjectIntMap<String> columnForName) throws SQLException;

EntityPopulator<Agency> AGENCY = (result, columnForName) -> {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/conveyal/gtfs/loader/Feed.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class Feed {
public final TableReader<Stop> stops;
public final TableReader<Trip> trips;
public final TableReader<StopTime> stopTimes;
public final TableReader<Pattern> patterns;

/**
* Create a feed that reads tables over a JDBC connection. The connection should already be set to the right
Expand All @@ -59,6 +60,7 @@ public Feed (DataSource dataSource, String tablePrefix) {
stops = new JDBCTableReader(Table.STOPS, dataSource, tablePrefix, EntityPopulator.STOP);
trips = new JDBCTableReader(Table.TRIPS, dataSource, tablePrefix, EntityPopulator.TRIP);
stopTimes = new JDBCTableReader(Table.STOP_TIMES, dataSource, tablePrefix, EntityPopulator.STOP_TIME);
patterns = new JDBCTableReader(Table.PATTERNS, dataSource, tablePrefix, EntityPopulator.PATTERN);
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/conveyal/gtfs/loader/FeedLoadResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class FeedLoadResult implements Serializable {
public TableLoadResult fareRules;
public TableLoadResult feedInfo;
public TableLoadResult frequencies;
public TableLoadResult patterns;
public TableLoadResult routes;
public TableLoadResult shapes;
public TableLoadResult stops;
Expand Down
Loading