Skip to content

Commit

Permalink
Merge branch 'optimizer' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxou44 committed Apr 9, 2019
2 parents c38c927 + 8bc30ed commit 227738a
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 16 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"getenv": "^0.7.0",
"http-proxy": "^1.17.0",
"md5": "^2.2.1",
"mkdirp": "^0.5.1",
"node-fetch": "^2.2.1",
"node-pre-gyp": "^0.11.0",
"pg": "^7.8.1",
Expand Down
96 changes: 88 additions & 8 deletions src/core/sessions.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import debug from 'debug';
import config from '../config';
import { publicUrl, plexUrl } from '../utils';
import { publicUrl, plexUrl, download, mdir } from '../utils';
import { dirname } from 'path';
import SessionStore from '../store';
import ServersManager from './servers';
import Database from '../database';
import fetch from 'node-fetch';
import uniqid from 'uniqid';

// Debugger
const D = debug('UnicornLoadBalancer');
Expand All @@ -13,8 +16,10 @@ let SessionsManager = {};
// Plex table to match "session" and "X-Plex-Session-Identifier"
let cache = {};

let ffmpegCache = {};

// Table to link session to transcoder url
let urls = {}
let urls = {};

SessionsManager.chooseServer = async (session, ip = false) => {
if (urls[session])
Expand Down Expand Up @@ -57,7 +62,7 @@ SessionsManager.getSessionFromRequest = (req) => {
}

// Parse FFmpeg parameters with internal bindings
SessionsManager.parseFFmpegParameters = async (args = [], env = {}) => {
SessionsManager.parseFFmpegParameters = async (args = [], env = {}, optimizeMode = false) => {
// Extract Session ID
const regex = /^http\:\/\/127.0.0.1:32400\/video\/:\/transcode\/session\/(.*)\/progress$/;
const sessions = args.filter(e => (regex.test(e))).map(e => (e.match(regex)[1]))
Expand Down Expand Up @@ -89,6 +94,7 @@ SessionsManager.parseFFmpegParameters = async (args = [], env = {}) => {
// Add seglist to arguments if needed and resolve links if needed
const segList = '{INTERNAL_TRANSCODER}video/:/transcode/session/' + sessionFull + '/seglist';
let finalArgs = [];
let optimize = {};
let segListMode = false;
for (let i = 0; i < parsedArgs.length; i++) {
let e = parsedArgs[i];
Expand All @@ -107,6 +113,13 @@ SessionsManager.parseFFmpegParameters = async (args = [], env = {}) => {
continue;
}

// Optimize, replace optimize path
if (optimizeMode && i > 0 && parsedArgs[i - 1] !== '-i' && e[0] === '/') {
finalArgs.push(`{OPTIMIZE_PATH}${e.split('/').slice(-1).pop()}`);
optimize[e.split('/').slice(-1).pop()] = e;
continue;
}

// Link resolver (Replace filepath to http plex path)
if (i > 0 && parsedArgs[i - 1] === '-i' && !config.custom.download.forward) {
let file = parsedArgs[i];
Expand All @@ -125,25 +138,92 @@ SessionsManager.parseFFmpegParameters = async (args = [], env = {}) => {
finalArgs.push(e);
};
return ({
id: uniqid(),
args: finalArgs,
env,
session: sessionId,
sessionFull
sessionFull,
optimize
});
};

// Store the FFMPEG parameters in RedisCache
SessionsManager.storeFFmpegParameters = async (args, env) => {
const parsed = await SessionsManager.parseFFmpegParameters(args, env);
console.log('FFMPEG', parsed.session, parsed);
SessionsManager.saveSession = (parsed) => {
SessionStore.set(parsed.session, parsed).then(() => { }).catch(() => { })
return (parsed);
};

// Call media optimizer on transcoders
SessionsManager.optimizerInit = async (parsed) => {
D(`OPTIMIZER ${parsed.session} [START]`);
const server = await ServersManager.chooseServer(parsed.session, false)
fetch(`${server}/api/optimize`, {
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
method: 'POST',
body: JSON.stringify(parsed)
})
return parsed;
};

// Call media optimizer on transcoders
SessionsManager.optimizerDelete = async (parsed) => {
D(`OPTIMIZER ${parsed.session} [DELETE]`);
SessionsManager.ffmpegSetCache(parsed.id, 0);
const server = await ServersManager.chooseServer(parsed.session, false)
fetch(`${server}/api/optimize/${parsed.session}`, {
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
method: 'DELETE',
body: JSON.stringify(parsed)
});
SessionsManager.cleanSession(parsed.session);
return parsed;
};

// Callback of the optimizer server
SessionsManager.optimizerDownload = (parsed) => (new Promise(async (resolve, reject) => {
const files = Object.keys(parsed.optimize);
const server = await SessionsManager.chooseServer(parsed.session);
for (let i = 0; i < files.length; i++) {
D(`OPTIMIZER ${server}/api/optimize/${parsed.session}/${encodeURIComponent(files[i])} [DOWNLOAD]`);
try {
await mdir(dirname(parsed.optimize[files[i]]));
}
catch (err) {
D(`OPTIMIZER Failed to create directory`);
}
try {
await download(`${server}/api/optimize/${parsed.session}/${encodeURIComponent(files[i])}`, parsed.optimize[files[i]])
}
catch (err) {
D(`OPTIMIZER ${server}/api/optimize/${parsed.session}/${encodeURIComponent(files[i])} [FAILED]`);
}
}
resolve(parsed);
}));

// Clear session
SessionsManager.cleanSession = (sessionId) => {
D('DELETE ' + sessionId);
return SessionStore.delete(sessionId)
};

// Set FFmpeg cache
SessionsManager.ffmpegSetCache = (id, status) => {
ffmpegCache[id] = status;
return ffmpegCache[id];
};

// Get FFmpeg cache
SessionsManager.ffmpegGetCache = (id) => {
if (typeof (ffmpegCache[id]) !== 'undefined')
return ffmpegCache[id];
return false;
};

// Export our SessionsManager
export default SessionsManager;
48 changes: 43 additions & 5 deletions src/routes/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,40 @@ RoutesAPI.update = (req, res) => {
res.send(ServersManager.update(req.body));
};

// Save the FFMPEG arguments
// Catch the FFMPEG arguments
// Body: {args: [], env: []}
RoutesAPI.ffmpeg = (req, res) => {
console.log('FFMPEG CALLED 1')
RoutesAPI.ffmpeg = async (req, res) => {
if (!req.body || !req.body.arg || !req.body.env)
return (res.status(400).send({ error: { code: 'INVALID_ARGUMENTS', message: 'Invalid UnicornFFMPEG parameters' } }));
console.log('FFMPEG CALLED 2')
return (res.send(SessionsManager.storeFFmpegParameters(req.body.arg, req.body.env)));

// Detect if we are in optimizer mode
if (req.body.arg.filter(e => (e === '-segment_list' || e === '-manifest_name')).length === 0) {
const parsedArgs = await SessionsManager.parseFFmpegParameters(req.body.arg, req.body.env, true);
SessionsManager.ffmpegSetCache(parsedArgs.id, false);
D('FFMPEG ' + parsedArgs.session + ' [OPTIMIZE]');
SessionsManager.saveSession(parsedArgs);
SessionsManager.optimizerInit(parsedArgs);
return (res.send(parsedArgs));
}
// Streaming mode
else {
const parsedArgs = await SessionsManager.parseFFmpegParameters(req.body.arg, req.body.env);
SessionsManager.ffmpegSetCache(parsedArgs.id, false);
D('FFMPEG ' + parsedArgs.session + ' [STREAMING]');
SessionsManager.saveSession(parsedArgs)
return (res.send(parsedArgs));
}
};

// Get FFMPEG status
RoutesAPI.ffmpegStatus = async (req, res) => {
if (!req.params.id)
return (res.status(400).send({ error: { code: 'INVALID_ARGUMENTS', message: 'Invalid parameters' } }));
D('FFMPEG ' + req.params.id + ' [PING]');
return (res.send({
id: req.params.id,
status: SessionsManager.ffmpegGetCache(req.params.id)
}));
};

// Resolve path from file id
Expand Down Expand Up @@ -67,5 +93,17 @@ RoutesAPI.session = (req, res) => {
})
};

// Optimizer finish
RoutesAPI.optimize = (req, res) => {
SessionStore.get(req.params.session).then((data) => {
SessionsManager.optimizerDownload(data).then((parsedData) => {
SessionsManager.optimizerDelete(parsedData);
});
res.send(data);
}).catch(() => {
res.status(400).send({ error: { code: 'SESSION_TIMEOUT', message: 'Invalid session' } });
})
};

// Export all our API routes
export default RoutesAPI;
2 changes: 2 additions & 0 deletions src/routes/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ export default (app) => {
app.use('/api/sessions', express.static(config.plex.path.sessions));
app.get('/api/stats', RoutesAPI.stats);
app.post('/api/ffmpeg', RoutesAPI.ffmpeg);
app.get('/api/ffmpeg/:id', RoutesAPI.ffmpegStatus);
app.get('/api/path/:id', RoutesAPI.path);
app.post('/api/update', RoutesAPI.update);
app.get('/api/session/:session', RoutesAPI.session);
app.patch('/api/optimize/:session', RoutesAPI.optimize);
app.all('/api/plex/*', RoutesAPI.plex);

// MPEG Dash support
Expand Down
2 changes: 1 addition & 1 deletion src/routes/transcode.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,6 @@ RoutesTranscode.download = (req, res) => {
}).catch((err) => {
res.status(400).send({ error: { code: 'NOT_FOUND', message: 'File not available' } });
})
}
};

export default RoutesTranscode;
26 changes: 24 additions & 2 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import redisClient from 'redis';

import fs from 'fs';
import fetch from 'node-fetch';
import mkdirp from 'mkdirp';
import config from './config';

export const publicUrl = () => {
Expand Down Expand Up @@ -31,4 +33,24 @@ export const getRedisClient = () => {
return redis;
};

export const time = () => (Math.floor((new Date().getTime()) / 1000));
export const time = () => (Math.floor((new Date().getTime()) / 1000));

export const download = (url, filepath) => (new Promise(async (resolve, reject) => {
const res = await fetch(url);
const fileStream = fs.createWriteStream(filepath);
res.body.pipe(fileStream);
res.body.on("error", (err) => {
reject(err);
});
fileStream.on("finish", () => {
resolve();
});
}));

export const mdir = (path) => (new Promise((resolve, reject) => {
mkdirp(path, (err) => {
if (err)
return reject(err);
return resolve(path);
})
}));

0 comments on commit 227738a

Please sign in to comment.