Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid overriding headers set in onresponse #129

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ Gateway.prototype.stop = function(cb){
gateway.stop(cb);
}

Gateway.prototype.addPlugin = function (name,plugin) {
Gateway.prototype.addPlugin = function (name,plugin,allPluginNames) {
assert(name,"plugin must have a name")
assert(_.isString(name),"name must be a string");
assert(_.isFunction(plugin),"plugin must be a function(config,logger,stats){return {onresponse:function(req,res,data,next){}}}");
const handler = this.pluginLoader.loadPlugin({plugin:plugin,pluginName:name});
const handler = this.pluginLoader.loadPlugin({plugin:plugin,pluginName:name,allPluginNames:allPluginNames});
this.plugins.push(handler);
};

103 changes: 103 additions & 0 deletions lib/PluginsSeqManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
const debug = require('debug')('plugins-seq-manager')


class PluginsSeqManager {

constructor(config, plugins){
this.plugins = plugins;
this.config = config;
this.urlPluginsCache = new Map();
this.defaultPlugins = plugins.filter( p => p.id === 'analytics' || p.id === 'metrics');
this.uniqueUrls = new Set();

if ( this.config.edgemicro.plugins && this.config.edgemicro.plugins.disableExcUrlsCache !== true ) {
debug('Loading all plugins exclude urls in memory');
this.loadAllUrls();
} else {
if ( this.config.edgemicro.plugins && this.config.edgemicro.plugins.excludeUrls ) {
this.config.edgemicro.plugins.excludeUrls.split(',').forEach( url => this.uniqueUrls.add(url) )
}
this.plugins.forEach( p => {
if ( p.id !== 'analytics' && p.id !== 'metrics' && this.config[p.id] && this.config[p.id].excludeUrls ) {
this.config[p.id].excludeUrls.split(',').forEach( url => this.uniqueUrls.add(url));
}
});
debug('Unique exclude urls', Array.from(this.uniqueUrls));
}
}

loadAllUrls() {

// load global excludeUrls
if ( this.config.edgemicro.plugins && this.config.edgemicro.plugins.excludeUrls ) {
this.config.edgemicro.plugins.excludeUrls.split(',').forEach( url => {
this.urlPluginsCache.set(url, {
plugins: this.defaultPlugins,
isGlobal: true
});
})
}
// load urls from the plugins which are enabled in sequence.
this.plugins.forEach( p => {
if ( p.id !== 'analytics' && p.id !== 'metrics' && this.config[p.id] ) {
let excludeUrls = null;
if ( this.config[p.id].excludeUrls ) {
excludeUrls = this.config[p.id].excludeUrls;
} else if ( p.id === 'quota' && this.config['quotas'] && this.config['quotas'].excludeUrls ) {
excludeUrls = this.config['quotas'].excludeUrls;
}
if (excludeUrls) {
excludeUrls.split(',').forEach( url => {
if ( !this.urlPluginsCache.has(url)) {
// skip this plugin
this.urlPluginsCache.set(url, {
plugins: this.plugins.filter( plgn => plgn.id !== p.id)
});
} else {
let value = this.urlPluginsCache.get(url);
this.urlPluginsCache.set(url, {
plugins: value.plugins.filter( plgn => plgn.id !== p.id)
});
}
}
);
}

}
});
debug('Total urls loaded: %d', this.urlPluginsCache.size);
for (let [url, value] of this.urlPluginsCache) {
debug('url: %s, plugins:',url,value.plugins.map(p=>p.id));
}
}

getPluginSequence(url){

if ( this.urlPluginsCache.has(url) ) {
return this.urlPluginsCache.get(url).plugins;
} else if( this.uniqueUrls.has(url) ){
// check if present in global exclude list
if ( this.config.edgemicro.plugins && this.config.edgemicro.plugins.excludeUrls &&
this.config.edgemicro.plugins.excludeUrls.split(',').indexOf(url) !== -1 ) {
return this.defaultPlugins;
} else {
let urlPlugins = [ ...this.defaultPlugins ];
this.plugins.forEach( p => {
if ( p.id === 'quota' && ( !this.config['quotas'] ||
!this.config['quotas'].excludeUrls || this.config['quotas'].excludeUrls.split(',').indexOf(url) === -1 )) {
urlPlugins.push(p);
} else if ( p.id !== 'analytics' && p.id !== 'metrics' && ( !this.config[p.id] ||
!this.config[p.id].excludeUrls || this.config[p.id].excludeUrls.split(',').indexOf(url) === -1 ) ) {
urlPlugins.push(p);
}
});
return urlPlugins;
}
} else {
return this.plugins;
}

}
}

module.exports = PluginsSeqManager;
4 changes: 3 additions & 1 deletion lib/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const errorsLib = require('./errors-middleware')
const logging = require('./logging')
const assert = require('assert')
const configService = require('./config')
const PluginsSeqManager = require('./PluginsSeqManager')

const CONSOLE_LOG_TAG = 'microgateway-core gateway';
//http server
Expand All @@ -32,7 +33,8 @@ module.exports.start = function(plugins, cb) {
const logger = logging.getLogger()

const configProxy = configProxyFactory()
const pluginsMiddleware = pluginsFactory(plugins)
const pluginsSeqManager = new PluginsSeqManager(config, plugins);
const pluginsMiddleware = pluginsFactory(pluginsSeqManager)
const errors = errorsLib(config, logger)

const serverMiddleware = function(req, res) {
Expand Down
23 changes: 20 additions & 3 deletions lib/logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ module.exports.init = function init(stubConfig, options) {
var records = [];
var offset = 0;
var nextRotation = 0;
var logFileOpenFailWarn = false;


// buffer the write if a write is already in progress
// https://nodejs.org/api/fs.html#fs_fs_write_fd_data_position_encoding_callback
// "Note that it is unsafe to use fs.write multiple times on the same file without waiting for the callback."
const writeLogRecordToFile = function (record,cb) {
if (Date.now() > nextRotation) {
if ( !logFileOpenFailWarn ) {
rotation++;
if (logFileFd) {
fs.close(logFileFd);
Expand All @@ -63,8 +65,18 @@ module.exports.init = function init(stubConfig, options) {
writeConsoleLog('log', {component: CONSOLE_LOG_TAG}, 'logging to ' + logFilePath);
}
}

logFileFd = fs.openSync(logFilePath, 'a', 0o0600);
}
try {
logFileFd = fs.openSync(logFilePath, 'a', 0o0600);
} catch (e) {
if ( !logFileOpenFailWarn ) {
writeConsoleLog('log',{component: CONSOLE_LOG_TAG}, 'Error in creating log file: %s, error: %s',logFilePath, e.message);
logFileOpenFailWarn = true;
}
writeLogRecordToConsole(record);
return record;
}
logFileOpenFailWarn = false;
nextRotation = Date.now() + ((logConfig.rotate_interval || 24) * 60 * 60 * 1000); // hours -> ms
}

Expand Down Expand Up @@ -178,14 +190,19 @@ module.exports.init = function init(stubConfig, options) {
if ( !isValidIPaddress(clientIP) ) {
clientIP = '';
}
let targetPortStr = '';
if ( !isNaN(parseInt(sourceRequest.targetPort)) ) {
targetPortStr = ':'+ parseInt(sourceRequest.targetPort);
}
sourceRequest.transactionContextData = {
correlation_id: correlation_id,
method: sourceRequest.method,
url: sourceRequest.url,
host: (sourceRequest.headers ? sourceRequest.headers.host : ''),
clientId: (sourceRequest.headers ? sourceRequest.headers['x-api-key'] : ''),
remoteAddress: (sourceRequest.socket ? (sourceRequest.socket.remoteAddress + ':' + sourceRequest.socket.remotePort) : ':0'),
clientIP: clientIP
clientIP: clientIP,
targetHostName: sourceRequest.targetHostname + targetPortStr
}
}
};
Expand Down
68 changes: 46 additions & 22 deletions lib/plugins-middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ var traceHelper = require('./trace-helper');

/**
* injects plugins into gateway
* @param plugins
* @param pluginsSeqManager
* @returns {Function(req,res,cb)}
*/
module.exports = function(plugins) {
module.exports = function(pluginsSeqManager) {
const logger = logging.getLogger();

return function(sourceRequest, sourceResponse, next) {
const startTime = sourceRequest.reqStartTimestamp || Date.now();
const plugins = pluginsSeqManager.getPluginSequence(sourceRequest.url);
const correlation_id = uuid.v1();
sourceRequest['correlationId'] = correlation_id;
logger.setTransactionContext( correlation_id,sourceRequest);
Expand Down Expand Up @@ -263,8 +264,9 @@ function getTargetRequest(sourceRequest, sourceResponse, plugins, startTime, cor
logger.eventLog({level:'warn',
d: Date.now() - startTime,
err: err,
req: sourceRequest,
component:'plugins-middleware'
h: sourceRequest.transactionContextData.targetHostName,
component:'plugins-middleware',
transactionContextData: sourceRequest.transactionContextData
}, 'targetRequest error');
debug('targetRequest error', correlation_id, err.stack);
async.series(getPluginHooksForEvent('error', {
Expand Down Expand Up @@ -297,9 +299,24 @@ function getTargetRequest(sourceRequest, sourceResponse, plugins, startTime, cor
});
});

// abort the target request if the connection from the sourceRequest/sourceResponse is closed.
sourceResponse.on('close', function(e){
if (sourceRequest.aborted) {
debug('source request aborted - abort target request', correlation_id);
targetRequest.abort();
logger.eventLog({
level:'info',
res: sourceResponse,
req: sourceRequest,
d: Date.now() - ( sourceRequest.headers['target_sent_start_timestamp'] || startTime ),
component:'plugins-middleware'
}, 'sourceResponse on close, source request aborted - abort target request');
}
});

const logInfo = {
level: 'info',
req: targetRequest,
h: sourceRequest.transactionContextData.targetHostName,
component:'plugins-middleware',
transactionContextData: sourceRequest.transactionContextData
};
Expand Down Expand Up @@ -342,7 +359,7 @@ function handleTargetResponse(targetRequest, targetResponse, options, cb) {
logger.eventLog({
level:'info',
res: targetResponse,
req: targetRequest,
h: sourceRequest.transactionContextData.targetHostName,
d: Date.now() - ( sourceRequest.headers['target_sent_start_timestamp'] || start ),
component:'plugins-middleware',
transactionContextData: sourceRequest.transactionContextData
Expand Down Expand Up @@ -371,7 +388,9 @@ function handleTargetResponse(targetRequest, targetResponse, options, cb) {
Object.keys(targetResponse.headers).forEach(function(header) {
// skip setting the 'connection: keep-alive' header
// setting it causes gateway to not accept any more connections
if (header !== 'connection') {
// Headers that have been set in onresponse are also skipped to let the plugins override those from the
// target response.
if (header !== 'connection' && !sourceResponse.hasHeader(header)) {
sourceResponse.setHeader(header, targetResponse.headers[header]);
}
});
Expand Down Expand Up @@ -502,20 +521,21 @@ function _subscribeToResponseEvents(plugins, sourceRequest, sourceResponse, targ
if (err) {
logger.eventLog({level:'error',req: sourceRequest, res: sourceResponse, err: err,component:'plugins-middleware'});
}
if (result && result.length) {
sourceResponse.end(result);
} else {
sourceResponse.end();
if (!sourceResponse.writableFinished) {
if (result && result.length) {
sourceResponse.end(result);
} else {
sourceResponse.end();
}
debug('sourceResponse close', correlation_id, sourceResponse.statusCode);
logger.eventLog({
level:'info',
res: sourceResponse,
req: sourceRequest,
d: Date.now() - sourceRequest.reqStartTimestamp || start,
component:'plugins-middleware'
}, 'sourceResponse' );
}
debug('sourceResponse close', correlation_id, sourceResponse.statusCode);
logger.eventLog({
level:'info',
res: sourceResponse,
req: sourceRequest,
d: Date.now() - sourceRequest.reqStartTimestamp || start,
component:'plugins-middleware'
}, 'sourceResponse' );

return cb(err, result);
});
});
Expand All @@ -531,18 +551,22 @@ function _subscribeToResponseEvents(plugins, sourceRequest, sourceResponse, targ
logger.eventLog({
level:'info',
res: targetResponse,
h: sourceRequest.transactionContextData.targetHostName,
d: Date.now() - ( sourceRequest.headers['target_sent_start_timestamp'] || targetStartTime ),
component:'plugins-middleware'
component:'plugins-middleware',
transactionContextData: sourceRequest.transactionContextData
}, 'targetResponse close');
});

targetResponse.on('error', function(err) {
logger.eventLog({
level:'error',
res: targetResponse,
h: sourceRequest.transactionContextData.targetHostName,
d: Date.now() - ( sourceRequest.headers['target_sent_start_timestamp'] || targetStartTime ),
err: err,
component:'plugins-middleware'
component:'plugins-middleware',
transactionContextData: sourceRequest.transactionContextData
}, 'targetResponse error');

debug('targetResponse error', correlation_id, err.stack);
Expand Down
20 changes: 14 additions & 6 deletions lib/plugins.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const CONSOLE_LOG_TAG = 'microgateway-core plugins';

var Plugins = function () {
this.config = configService.get();
this.emgConfigs = null;
};


Expand All @@ -28,6 +29,18 @@ var Plugins = function () {
Plugins.prototype.loadPlugin = function (options) {
var name;
var plugin;
// extract only emg level configs by removing plugin configs.
if (options.allPluginNames && !this.emgConfigs) {
this.emgConfigs = {}
let sequence = [];
if ( this.config.edgemicro.plugins && this.config.edgemicro.plugins.sequence ) {
sequence = this.config.edgemicro.plugins.sequence;
}
Object.keys(this.config).filter( key => sequence.indexOf(key) === -1 && key !== 'quotas'
&& options.allPluginNames.indexOf(key) === -1).forEach( emgKey => {
this.emgConfigs[emgKey] = this.config[emgKey];
});
}

if (_.isObject(options)) {
assert(options.plugin, "must have plugin loaded in memory");
Expand All @@ -47,17 +60,12 @@ Plugins.prototype.loadPlugin = function (options) {

var middleware;
const subconfig = config[name] || {};

subconfig.emgConfigs = this.emgConfigs;
if (plugin) {
if(config.keys && config.keys.key && config.keys.secret){
subconfig.key = config.keys.key;
subconfig.secret = config.keys.secret;
}
if(name==='quota') {
subconfig.proxies = config.proxies;
subconfig.product_to_proxy = config.product_to_proxy;
subconfig.global = config.edgemicro.global;
}
middleware = plugin(subconfig, logger, stats);
assert(_.isObject(middleware), 'ignoring invalid plugin handlers ' + name);
middleware.id = name;
Expand Down
6 changes: 6 additions & 0 deletions lib/proxy-path-matcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ const minimatch = require('minimatch');
//const util = require('util');

const matchPath = (requestPath, proxyPath) => {
if ( requestPath.charAt(requestPath.length-1) !== '/') {
requestPath += '/';
}
if ( proxyPath.charAt(proxyPath.length-1) !== '/') {
proxyPath += '/';
}
var pathRegex = generateMatchingRegex(proxyPath);
return pathRegex.test(requestPath);
}
Expand Down
Loading