-
Notifications
You must be signed in to change notification settings - Fork 0
2. Getting logged events
- Summary
- Overview
- HTTP call
- Prepare extraction parameters
- Extract all logged events
- Parse the extracted events
- Update general counts
- Build and store entities to process
- Update memcache and launch the next step
- Prepare extraction parameters
- Retrieve extraction parameters from memcache
- Fix any unfetched parameter
- Create new parameters based on call type
- Extract all logged events
- Parse the extracted events
- Update general counts
- Build and store entities to process
- Update memcache and launch the next step
This part of the process extracts the logged event data in Carto and parses their content to produce a resource-centric data structure.
Log events are stored in Carto based on the query event itself: each entry refers to one action in the portal/APIs, and can include information on many resources. The main task of this part is to extract that information and produce an individualized record for each resource, merging and calculating stats of many different events. It is basically performing a map-reduce operation where the log events are mapped and the information is reduced to a set of resources.
It all starts with a POST
call to the get_events
method of the admin/parser
family endpoint. The call will usually be made asynchronously by the previous method, init
, but it can also be made directly in case a previous attempt of executing the get_events
method failed.
The URL for the endpoint is:
http://tools-usagestats.vertnet-portal.appspot.com/admin/parser/get_events
According to usagestats.py
, a call to this URL launches a new instance of the GetEvents
class, found in the admin.parser.GetEvents
module.
Async calls have the disadvantage that we need to specify in some way the parameters related to each call. This can, however, be easily fixed with memcache
.
On the post
method itself, there is some code to retrieve all the memcache'd parameters from the previous step:
# Retrieve parameters from memcache and request
params = memcache.get_multi([
"period",
"table_name",
"searches_extracted",
"downloads_extracted"
], key_prefix="usagestats_parser_")
It might happen that this step is called directly (e.g., because of a failure in a precious attempt). In that case, there would be no memcache'd value, so the process looks for the mandatory parameter period
in the new call:
# Try to get period from the request in case GetEvents was called directly
try:
self.period = self.request.get("period").lower()
self.params_from_request = True
s = "Version: %s\n" % __version__
s += "Period %s determined from request: %s" % (self.period, self.request)
logging.info(s)
# If not in request, try to get parameters from memcache in case GetEvents was
# called from a previous task.
except Exception:
memcache_keys = ["period", "table_name", "searches_extracted", \
"downloads_extracted"]
params = memcache.get_multi(memcache_keys, key_prefix="usagestats_parser_")
self.period = params['period']
self.params_from_request = False
s = "Version: %s\n" % __version__
s += "Period %s determined from memcache: %s" % (self.period, params)
logging.info(s)
If there is no period to be found anywhere, we can't proceed.
# If period still not there, halt
if self.period is None or len(self.period)==0:
self.error(400)
resp = {
"status": "error",
"message": "Period parameter was not provided."
}
s = "Version: %s\n" % __version__
s += "%s" % resp
logging.error(s)
self.response.write(json.dumps(resp)+"\n")
return
# If Period not already stored, halt
period_key = ndb.Key("Period", self.period)
period_entity = period_key.get()
if not period_entity:
self.error(400)
resp = {
"status": "error",
"message": "Provided period does not exist in datastore",
"data": {
"period": self.period
}
}
logging.error(resp)
self.response.write(json.dumps(resp)+"\n")
return
The remaining parameters are grabbed as well:
# Get the remaining parameters based on the parameter source
if self.params_from_request == True:
# Get parameters from request
# 'table_name' parameter
try:
self.table_name = self.request.get('table_name')
except KeyError:
# Table name not provided, use default
self.table_name = CDB_TABLE
# 'downloads_extracted' parameter
try:
self.downloads_extracted = self.request.get('downloads_extracted').\
lower() == 'true'
except Exception:
s = "Version: %s\n" % __version__
s += "Aborting GetEvents. "
s += "Unable to extract 'downloads_extracted' from request: %s" % request
logging.error(s)
return
# 'searches_extracted' parameter
try:
self.searches_extracted = self.request.get('searches_extracted').\
lower() == 'true'
except KeyError:
s = "Version: %s\n" % __version__
s += "Aborting GetEvents. "
s += "Unable to extract 'searches_extracted' from request: %s" % request
logging.error(s)
return
else:
# Get parameters from memcache
# 'table_name' parameter
try:
self.table_name = params['table_name']
except KeyError:
# default value for 'table_name' if not provided is None
self.table_name = CDB_TABLE
# 'downloads_extracted' parameter
try:
self.downloads_extracted = params['downloads_extracted'].lower() == 'true'
except Exception:
s = "Version: %s\n" % __version__
s += "Aborting GetEvents. "
s += "Unable to extract 'downloads_extracted' from params: %s" % params
logging.error(s)
return
# 'searches_extracted' parameter
try:
self.searches_extracted = params['searches_extracted'].lower() == 'true'
except Exception:
s = "Version: %s\n" % __version__
s += "Aborting GetEvents. "
s += "Unable to extract 'searches_extracted' from params: %s" % params
logging.error(s)
return
This get_events
step runs twice: once to extract download data and a second time to extract search data. To keep track of these processes, there are two special memcache variables: downloads_extracted
and searches_extracted
.
Before doing anything, the values of both parameters are set to False
in the previous step (init
). This False
value indicates the processes have not finished properly. When the first round finishes, the downloads_extracted
will be set to True
, so the next time the get_events
process is called, it will start extracting searches. When all searches have been extracted, the searches_extracted
parameter will be set to True
. Once both parameters are set to True
, the process is able to continue.
This piece of code handles this part:
# Start with downloads
if self.downloads_extracted is False:
self.t = "download"
# and continue with searches
elif self.searches_extracted is False:
self.t = "search"
# if, by mistake, none is True...
else:
# ... call 'process_events' and move on
taskqueue.add(url=URI_PROCESS_EVENTS, queue_name=QUEUENAME)
return
The new type
instance variable stores the t
ype of extraction: download
or search
.
After all these parameters are set, the service can start extracting actual data.
This method queries the Carto SQL API in order to extract all required values for searches or downloads from the specified table (query_log_master
by default).
Several steps are required to build the query, and queries are different for searches and for downloads
Downloads:
if self.t == 'download':
# Line #6 of SQL is to avoid too large queries
query = "SELECT cartodb_id, lat, lon, created_at, " \
"query AS query_terms, response_records, " \
"results_by_resource " \
"FROM %s " \
"WHERE type='download' "\
"AND octet_length(query)<=1500 " \
"AND download IS NOT NULL " \
"AND download !=''" % self.table_name
Searches:
else:
# Line #6 of SQL is to avoid too large queries
query = "SELECT cartodb_id, lat, lon, created_at, " \
"query AS query_terms, response_records, " \
"results_by_resource " \
"FROM %s " \
"WHERE left(type, 5)='query' " \
"AND octet_length(query)<=1500 " \
"AND results_by_resource IS NOT NULL " \
"AND results_by_resource != '{}' " \
"AND results_by_resource !=''" % self.table_name
Common fields: cartodb_id
, lat
, lon
, created_at
, query
, response_records
, results_by_resource
Table name: value of the table_name
parameter. By default, query_log_master
Common constraints: octet_length(query)<=1500
. This is required because the query
field in the Cloud Datastore model is indexed (see models.QueryTerms
), and indexed fields have a maximum allowed length of 1500 bytes.
Download
constraints: download is not null and download !=''
Search
constraints: results_by_resource is not null and results_by_resource !='{}' and results_by_resource !=''
Only extract data from main portal usage:
query += " and client='portal-prod'"
If the main table query_log_master
is used, only take data from the requested period:
# Only restrict time if using default table
if self.table_name == CDB_TABLE:
queried_date = datetime(
int(self.period[:4]),
int(self.period[-2:]),
1
)
queried_date += timedelta(days=32)
query = add_time_limit(query=query, today=queried_date)
The add_time_limit
function (found in utils
module) updates the SQL to include a time constraint:
def add_time_limit(query, today=datetime.today(), lapse='month'):
"""Add time limit to Carto query.
Default behavior is to extract stats from just the last month.
"""
if lapse == 'month':
this_year = today.year
this_month = today.month
if this_month == 1:
limit_year = this_year - 1
limit_month = 12
else:
limit_year = this_year
limit_month = this_month - 1
limit_string = " and extract(year from created_at)=%s" % limit_year
limit_string += " and extract(month from created_at)=%s" % limit_month
query += limit_string
return query
This function adds two time constraints to the SQL, one for the year and one for the month, so only records in the requested month and year are extracted.
Then, the Carto API is called, with some code to handle timeouts:
try:
data = carto_query(query)
except ApiQueryMaxRetriesExceededError:
self.error(504)
resp = {
"status": "error",
"message": "Could not retrieve data from Carto",
"data": {
"period": self.period,
"event_type": self.t
}
}
self.response.write(json.dumps(resp) + "\n")
return 1
The carto_query
function can be found in the util
package. The definition of the ApiQueryMaxRetriesExceededError
error can also be found in the util
package.
If the call was successful, store the data in an instance variable, self.data
, for processing.
The parse_events
method takes the data stored in the data
instance variable and, record by record, it applies some transformations.
The main change this step does is to change the data from an event-based list to a resource-based list. Basically, it means doing a map-reduce with the data: map the events, extract resource-related information and reduce to a single list of resources with event counts.
Data is taken from the data
instance variable, and results will be stored in a python native dictionary, called resources
.
Each query event has some values common to all the resources queried in that event:
-
event_created
: date of creation -
event_country
: country where the query was performed -
event_terms
: the actual query that was sent to the portal or API
event_created = datetime.strptime(event['created_at'],
'%Y-%m-%dT%H:%M:%SZ')
# Keep just YMD
event_created = event_created.strftime('%Y-%m-%d')
(...)
event_country = geonames_query(event['lat'], event['lon'])
event_terms = event['query_terms']
The definition of the geonames_query
method can be found in the util
package.
Data about the resources found in that query are found in the results_by_resource
field of the Carto response. This field has this structure:
{resource_1: number_of_records, resource_2: number_of_records, ...}
These results are stored in an inner variable, called event_results
, and transformed into a python native dictionary:
event_results = json.loads(event['results_by_resource'])
Then, for each of the keys in the dictionary (i.e., for each of the resource ids), it performs the following actions:
Only of it is the first time this resource appears. The fields of the dictionary entry reflect the structure of the Cloud Datastore model, which will be used in the next step.
# Initialize resource if not existing
if resource not in resources:
resources[resource] = {
'records': 0,
'query_countries': {},
'query_dates': {},
'query_terms': {}
}
records
will store the number of records searched/downloaded in all events
query_countries
will store a breakdown of countries that queried data from the resouce
query_dates
will store a breakdown of the dates in which queries were made
query_terms
will store a breakdown of terms that were used to queried data from the resouce
The actual value of number of records is added to the count in field records
resources[resource]['records'] += event_results[resource]
If the country of the query is already in the query_countries
dictionary, it simply adds 1 to the number of times this country queried the resource. Otherwise, a new entry is added to the query_countries
dictionary:
# Add query country
if event_country not in resources[resource]['query_countries']:
resources[resource]['query_countries'][event_country] = {
'query_country': event_country,
'times': 1
}
else:
resources[resource]['query_countries'][event_country]['times'] += 1
The same as with query_country
:
# Add query date
if event_created not in resources[resource]['query_dates']:
resources[resource]['query_dates'][event_created] = {
'query_date': event_created,
'times': 1
}
else:
resources[resource]['query_dates'][event_created]['times'] += 1
The same as with query_country
, with one difference: the process also stores how many records were found with those terms
# Add query terms
if event_terms not in resources[resource]['query_terms']:
resources[resource]['query_terms'][event_terms] = {
'query_terms': event_terms,
'times': 1,
'records': event_results[resource]
}
else:
resources[resource]['query_terms'][event_terms]['times'] += 1
resources[resource]['query_terms'][event_terms]['records'] += event_results[resource]
After all resources and all events are finished, the resources
dictionary containing all the "map-reduced" information is stored in an instance variable for further processing.
# Store 'resources' in class property
self.resources = resources
At this point, the process has finished retrieving and reformatting data for either downloads or searches. This part of the process updates the counts in the Period
entity in the datastore for either downloads or searches.
It first grabs the Period
entity:
# Get Period entity
period_key = ndb.Key("Period", self.period)
period_entity = period_key.get()
And adds two new properties to the entity:
-
[downloads|searches]_in_period
: this number reflects how many download or search events happened in the period. -
[downloads|searches]_to_process
: this number is the count of resources to process, the length of theresources
dictionary.
# Update (downloads|searches)_in_period and
# (downloads|searches)_to_process in Period
if self.t == 'download':
period_entity.downloads_in_period = len(self.data)
period_entity.records_downloaded_in_period = \
sum([int(x['response_records']) for x in self.data])
period_entity.downloads_to_process = len(self.resources)
elif self.t == 'search':
period_entity.searches_in_period = len(self.data)
period_entity.records_searched_in_period = \
sum([int(x['response_records']) for x in self.data])
period_entity.searches_to_process = len(self.resources)
The first parameter will be used in the reports. The second is used in the next step, to transform dictionary entries to datastore Report
entities.
The Period
entity with the new values is then updated, with some code to grab errors:
# Store updated period data
k = period_entity.put()
if k != period_key:
logging.error("Could not update %s counts in period" % self.t)
self.error(500)
resp = {
"status": "error",
"message": "Could not update %s counts in period" % self.t,
"data": {
"period": self.period,
"event_type": self.t
}
}
self.response.write(json.dumps(resp) + "\n")
return 1
else:
logging.info("Period counts for %s events updated" % self.t)
return 0
In this step, data in the instance variable jumps to the datastore, to a temporary location in the ReportToProcess
entities (models
module). From there, the next step ProcessEvents
will asynchronously take those data and transform them to their final shape.
So, for each of the entries of the resources
dictionary, this function creates a new ReportToProcess
entity, and later stores them all in a single call.
# Build temporary entities
logging.info("Storing %d resources" % len(self.resources))
r = []
for resource in self.resources:
params = {
"t": self.t,
"gbifdatasetid": resource,
"resource": self.resources[resource]
}
r.append(ReportToProcess(**params))
# Store temporary entities
logging.info("Putting %d entities" % len(r))
sr = ndb.put_multi(r)
There is also a sanity check to make sure all the reports that need to be processed are stored in this "collection":
# Check
if len(sr) != len(r):
logging.error("Not all resources were put to process.")
self.error(500)
resp = {
"status": "error",
"message": "Not all resources were put to process.",
"data": {
"period": self.period,
"t": self.t,
"resources": len(r),
"to_process": len(sr)
}
}
self.response.write(json.dumps(resp) + "\n")
return
At the beginning of this document, we talked about the searches_extracted
and downloads_extracted
parameters, and that they both were set to false
at the beginning of the process. Now, after the extraction of searches or downloads has finished, the corresponding parameter is set to True
, to avoid unnecessary repetition.
# Update memcache
if self.t == "search":
memcache.set("usagestats_parser_searches_extracted", True)
else:
memcache.set("usagestats_parser_downloads_extracted", True)
Then, it checks if both are True
. If so, the next step is launched. Otherwise, the same step is launched again to finish with the remaining extraction.
# If both are True, end now
p = memcache.get_multi(["searches_extracted", "downloads_extracted"],
key_prefix="usagestats_parser_")
if p['searches_extracted'] is True and\
p['downloads_extracted'] is True:
# Call 'process_events'
logging.info("All searches and downloads extracted")
taskqueue.add(url=URI_PROCESS_EVENTS,
queue_name=QUEUENAME)
else:
taskqueue.add(url=URI_GET_EVENTS,
queue_name=QUEUENAME)
And it returns a JSON document with some stats:
# Build response
resp = {
"status": "success",
"message": "All %s events downloaded and parsed" % self.t,
"data": {
"period": self.period,
"event_type": self.t,
"event_number": len(self.data),
"resources_to_process": len(self.resources)
}
}
self.response.write(json.dumps(resp) + "\n")
This repository is part of the VertNet project.
For more information, please check out the project's home page and GitHub organization page