Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
max-ostapenko committed Dec 11, 2024
1 parent e591526 commit b50b7a7
Showing 1 changed file with 1 addition and 108 deletions.
109 changes: 1 addition & 108 deletions infra/bigquery-export/firestore.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ export class FirestoreBatch {
this.firestore = new Firestore()
this.bigquery = new BigQueryExport()
this.firestore.settings({
databaseId: 'tech-report-apis-prod',
timeout: 60000, // TODO: check if these are needed
commitTimeout: 600 * 1000 // needed?
databaseId: 'tech-report-apis-prod'
})
this.batchSize = 500
this.maxConcurrentBatches = 200
Expand Down Expand Up @@ -153,108 +151,3 @@ export class FirestoreBatch {
await this.streamFromBigQuery(rowStream)
}
}

export class FirestoreBulkWriter {
constructor () {
this.firestore = new Firestore()
this.bigquery = new BigQueryExport()
this.firestore.settings({
databaseId: 'tech-report-apis-prod',
commitTimeout: 600 * 1000
})
this.batchSize = 500
this.maxConcurrentBatches = 200
}

async bulkDelete () {
console.info('Starting bulk deletion...')
const startTime = Date.now()
let totalDocsDeleted = 0
const collectionRef = this.firestore.collection(this.collectionName)

// Create bulk writer
const bulkWriter = this.firestore.bulkWriter()

let collectionQuery
if (this.collectionType === 'report') {
console.info(`Deleting documents from ${this.collectionName} for date ${this.date}`)
// Query to fetch monthly documents
collectionQuery = collectionRef.where('date', '==', this.date)
} else if (this.collectionType === 'dict') {
console.info(`Deleting documents from ${this.collectionName}`)
collectionQuery = collectionRef
} else {
throw new Error('Invalid collection type')
}

while (true) {
const snapshot = await collectionQuery.limit(this.batchSize * this.maxConcurrentBatches).get()
if (snapshot.empty) {
break
}

for (const doc of snapshot.docs) {
// Use bulkWriter for deletion
bulkWriter.delete(doc.ref)
totalDocsDeleted++

// Optionally, you can add error handling for each operation
bulkWriter.onWriteError((error) => {
console.error('Error in bulk delete:', error)
})
}
}

// Ensure all delete operations are completed
await bulkWriter.close()

const duration = (Date.now() - startTime) / 1000
console.info(`Deletion complete. Total docs deleted: ${totalDocsDeleted}. Time: ${duration} seconds`)
}

/**
* Streams BigQuery query results into a Firestore collection using bulk writer.
* @param {ReadableStream} rowStream - Stream of rows from BigQuery
*/
async streamFromBigQuery (rowStream) {
console.info('Starting BigQuery to Firestore transfer...')
const startTime = Date.now()
let totalRowsProcessed = 0

// Create bulk writer
const bulkWriter = this.firestore.bulkWriter()

for await (const row of rowStream) {
// Generate document ID using the existing hash function
const docId = technologyHashId(row, this.collectionName, TECHNOLOGY_QUERY_ID_KEYS)
const docRef = this.firestore.collection(this.collectionName).doc(docId)

// Use bulkWriter to set document
bulkWriter.set(docRef, row)
totalRowsProcessed++

// Optional: Add error handling for each write operation
bulkWriter.onWriteError((error) => {
console.error('Error in bulk write:', error)
})
}

// Ensure all write operations are completed
await bulkWriter.close()

const duration = (Date.now() - startTime) / 1000
console.info(`Transfer to ${this.collectionName} complete. Total rows processed: ${totalRowsProcessed}. Time: ${duration} seconds`)
}

async export (config, query) {
this.date = config.date
this.collectionName = config.name
this.collectionType = config.type

// Optionally delete existing documents before writing
// await this.bulkDelete()

const rowStream = await this.bigquery.queryResultsStream(query)
await this.streamFromBigQuery(rowStream)
}
}

0 comments on commit b50b7a7

Please sign in to comment.