Skip to content
This repository has been archived by the owner on Aug 20, 2024. It is now read-only.

Commit

Permalink
Use named cursor and itersize to 70k for pagination (#7)
Browse files Browse the repository at this point in the history
* named cursor and itersize to 70k

* make cursor name unique
  • Loading branch information
hgulersen authored Feb 7, 2023
1 parent 5e96786 commit a5fa5a8
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions tap_redshift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# data.world, Inc.(http://data.world/).

import copy
import secrets
import time
from itertools import groupby

Expand Down Expand Up @@ -68,6 +69,8 @@

CONFIG = {}

ROWS_PER_NETWORK_CALL = 70_000


def discover_catalog(conn, db_schema):
'''Returns a Catalog describing the structure of the database.'''
Expand Down Expand Up @@ -114,7 +117,7 @@ def discover_catalog(conn, db_schema):
table_columns = [{'name': k, 'columns': [
{'pos': t[1], 'name': t[2], 'type': t[3],
'nullable': t[4]} for t in v]}
for k, v in groupby(column_specs, key=lambda t: t[0])]
for k, v in groupby(column_specs, key=lambda t: t[0])]

table_pks = {k: [t[1] for t in v]
for k, v in groupby(pk_specs, key=lambda t: t[0])}
Expand Down Expand Up @@ -252,7 +255,8 @@ def open_connection(config):
dbname = config['dbname'],
user = config['user'],
password = config['password']
LOGGER.info(f"Attempting Redshift connection: {dbname[0]} {host[0]} {port[0]}")
LOGGER.info(
f"Attempting Redshift connection: {dbname[0]} {host[0]} {port[0]}")
connection = psycopg2.connect(
host=host[0],
port=port[0],
Expand Down Expand Up @@ -303,7 +307,7 @@ def sync_table(connection, catalog_entry, state):

tap_stream_id = catalog_entry.tap_stream_id
LOGGER.info('Beginning sync for {} table'.format(tap_stream_id))
with connection.cursor() as cursor:
with connection.cursor(f"redshift_cursor_{secrets.token_hex(8)}") as cursor:
schema, table = catalog_entry.table.split('.')
database = catalog_entry.database
select = 'SELECT {} FROM {}.{}.{}'.format(
Expand Down Expand Up @@ -366,6 +370,8 @@ def sync_table(connection, catalog_entry, state):
time_extracted = utils.now()
query_string = cursor.mogrify(select, params)
LOGGER.info('Running {}'.format(query_string))

cursor.itersize = ROWS_PER_NETWORK_CALL
cursor.execute(select, params)
row = cursor.fetchone()
rows_saved = 0
Expand Down

0 comments on commit a5fa5a8

Please sign in to comment.