Skip to content

Commit

Permalink
Merge pull request #364 from metrico/prom_support
Browse files Browse the repository at this point in the history
3.0: Full Prometheus support
  • Loading branch information
akvlad authored Oct 27, 2023
2 parents 79c374c + 5e4dc2e commit 334a03b
Show file tree
Hide file tree
Showing 16 changed files with 3,513 additions and 102 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ node_modules
/test/e2e/
/lib/influx/.idea/
/lib/influx/influx.iml
/wasm_parts/_vendor.zip
/wasm_parts/.idea/
/wasm_parts/vendor/
/wasm_parts/main.wasm
/wasm_parts/wasm_parts.iml
13 changes: 10 additions & 3 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,13 @@ const outputTempoSearch = async (dataStream, res) => {
*/
const preprocessStream = (rawStream, processors) => {
let dStream = StringStream.from(rawStream.data).lines().endWith(JSON.stringify({ EOF: true }))
.map(chunk => chunk ? JSON.parse(chunk) : ({}), DataStream)
.map(chunk => {
try {
return chunk ? JSON.parse(chunk) : ({})
} catch (e) {
return {}
}
}, DataStream)
.map(chunk => {
try {
if (!chunk || !chunk.labels) {
Expand Down Expand Up @@ -1344,15 +1350,16 @@ const samplesReadTable = {
* @param query {string}
* @param data {string | Buffer | Uint8Array}
* @param database {string}
* @param config {Object?}
* @returns {Promise<AxiosResponse<any>>}
*/
const rawRequest = (query, data, database) => {
const rawRequest = (query, data, database, config) => {
const getParams = [
(database ? `database=${encodeURIComponent(database)}` : null),
(data ? `query=${encodeURIComponent(query)}` : null)
].filter(p => p)
const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}`
return axios.post(url, data || query)
return axios.post(url, data || query, config)
}

/**
Expand Down
2 changes: 0 additions & 2 deletions lib/handlers/prom_push.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ async function handler (req, res) {
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
finger = fingerPrint(strJson)
req.log.debug({ labels: stream.labels, finger }, 'LABELS FINGERPRINT')
labels.add(finger.toString(), stream.labels)

const dates = {}
if (stream.samples) {
stream.samples.forEach(function (entry) {
req.log.debug({ entry, finger }, 'BULK ROW')
if (
!entry &&
!entry.timestamp &&
Expand Down
33 changes: 10 additions & 23 deletions lib/handlers/prom_query.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
/* Emulated PromQL Query Handler */

const { p2l } = require('@qxip/promql2logql');
const { asyncLogError, CORS } = require('../../common')
const { instantQueryScan } = require('../db/clickhouse')
const { instantQuery } = require('../../promql')
const empty = '{"status" : "success", "data" : {"resultType" : "scalar", "result" : []}}'; // to be removed
const test = () => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, "2"]}}`; // to be removed
const exec = (val) => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, val]}}`; // to be removed


async function handler (req, res) {
req.log.debug('GET /loki/api/v1/query')
const resp = {
Expand All @@ -25,34 +23,23 @@ async function handler (req, res) {
}
if (req.query.query === '1+1') {
return res.status(200).send(test())
}
else if (!isNaN(parseInt(req.query.query))) {
} else if (!isNaN(parseInt(req.query.query))) {
return res.status(200).send(exec(parseInt(req.query.query)))
}
/* remove newlines */
req.query.query = req.query.query.replace(/\n/g, ' ')
req.query.time = req.query.time ? parseInt(req.query.time) * 1000 : Date.now()
/* transpile to logql */
try {
req.query.query = p2l(req.query.query)
} catch(e) {
asyncLogError({ e }, req.log)
return res.send(empty)
}
/* scan fingerprints */
/* TODO: handle time tag + direction + limit to govern the query output */
try {
const response = await instantQueryScan(
req.query
)
res.code(200)
res.headers({
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': CORS
})
return response
const response = await instantQuery(req.query.query, req.query.time)
return res.code(200)
.headers({
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': CORS
}).send(response)
} catch (err) {
asyncLogError(err, req.log)
return res.send(empty)
return res.code(500).send(JSON.stringify({ status: 'error', error: err.message }))
}
}

Expand Down
46 changes: 8 additions & 38 deletions lib/handlers/prom_query_range.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,19 @@
regexp: a regex to filter the returned results, will eventually be rolled into the query language
*/

const { p2l } = require('@qxip/promql2logql')
const { asyncLogError, CORS } = require('../../common')
const { scanFingerprints } = require('../db/clickhouse')
const { rangeQuery } = require('../../promql/index')

async function handler (req, res) {
req.log.debug('GET /api/v1/query_range')
const resp = {
status: "success",
data: {
resultType: "vector",
result: []
}
}
if (req.method === 'POST') {
req.query = req.body
}
if (!req.query.query) {
return res.send(resp)
}
/* remove newlines */
req.query.query = req.query.query.replace(/\n/g, ' ')
if (!req.query.query) {
return res.code(400).send('invalid query')
}
// Convert PromQL to LogQL and execute
const startMs = parseInt(req.query.start) * 1000 || Date.now() - 60000
const endMs = parseInt(req.query.end) * 1000 || Date.now()
const stepMs = parseInt(req.query.step) * 1000 || 15000
const query = req.query.query
try {
req.query.query = p2l(req.query.query)
const response = await scanFingerprints(
{
...req.query,
start: parseInt(req.query.start) * 1e9,
end: parseInt(req.query.end) * 1e9
}
)
res.code(200)
res.headers({
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': CORS
})
return response
const result = await rangeQuery(query, startMs, endMs, stepMs)
return res.code(200).send(result)
} catch (err) {
asyncLogError(err, req.log)
return res.send(resp)
return res.code(500).send(JSON.stringify({ status: 'error', error: err.message }))
}
}

Expand Down
55 changes: 20 additions & 35 deletions lib/handlers/prom_series.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,8 @@
const { scanSeries } = require('../db/clickhouse')
const { CORS } = require('../../common')
const { Compiler } = require('bnf')
const { isArray } = require('handlebars-helpers/lib/array')
const { QrynError } = require('./errors')

const promqlSeriesBnf = `
<SYNTAX> ::= <metric_name><OWSP> | "{" <OWSP> <label_selectors> <OWSP> "}" | <metric_name><OWSP> "{" <OWSP> [<label_selectors>] <OWSP> "}"
label ::= (<ALPHA> | "_") *(<ALPHA> | "." | "_" | <DIGITS>)
operator ::= "=~" | "!~" | "!=" | "="
quoted_str ::= (<QUOTE><QUOTE>) | (<AQUOTE><AQUOTE>) | <QLITERAL> | <AQLITERAL>
metric_name ::= label
label_selector ::= <label> <OWSP> <operator> <OWSP> <quoted_str>
label_selectors ::= <label_selector><OWSP>*(","<OWSP><label_selector><OWSP>)
`

const compiler = new Compiler()
compiler.AddLanguage(promqlSeriesBnf, 'promql_series')
const {series} = require('../../promql/index')

// Series Handler
async function handler (req, res) {
Expand All @@ -30,27 +17,25 @@ async function handler (req, res) {
if (!isArray(query)) {
query = [query]
}
query = query.map((m) => {
let res = '{'
let parsed = compiler.ParseScript(m)
if (!parsed) {
throw new QrynError(400, `invalid series query ${m}`)
}
parsed = parsed.rootToken
res += parsed.Child('metric_name') ? `name="${parsed.Child('metric_name').val}` : ''
res += parsed.Child('metric_name') && parsed.Child('label_selector') ? ',' : ''
res += parsed.Children('label_selector').map(c => c.value.toString()).join(',')
res += '}'
return res
})
// convert the input query into a label selector
const response = await scanSeries(query)
res.code(200)
res.headers({
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': CORS
})
return response
const startMs = req.query.start ? parseInt(req.query.start) * 1000 : Date.now() - 7 * 24 * 3600 * 1000
const endMs = req.query.end ? parseInt(req.query.end) * 1000 : Date.now() - 7 * 24 * 3600 * 1000
const result = []
try {
query = query.map(async (m) => {
const _result = await series(m, startMs, endMs)
result.push.apply(result, _result)
})
await Promise.all(query)
return res.code(200).headers({
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': CORS
}).send(JSON.stringify({
status: 'success',
data: result
}))
} catch (err) {
return res.code(500).send(JSON.stringify({ status: 'error', error: err.message }))
}
}

module.exports = handler
2 changes: 1 addition & 1 deletion lib/handlers/push.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async function handler (req, res) {
req.log.debug('POST /loki/api/v1/push')
if (!req.body) {
await processRawPush(req, DATABASE.cache.labels, bulk_labels, bulk,
toJSON, fingerPrint)
toJson, fingerPrint)
return res.code(200).send()
}
if (readonly) {
Expand Down
Loading

0 comments on commit 334a03b

Please sign in to comment.