123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- var AWS = require('../core');
- var Stream = AWS.util.stream.Stream;
- var TransformStream = AWS.util.stream.Transform;
- var ReadableStream = AWS.util.stream.Readable;
- require('../http');
- var CONNECTION_REUSE_ENV_NAME = 'AWS_NODEJS_CONNECTION_REUSE_ENABLED';
- /**
- * @api private
- */
- AWS.NodeHttpClient = AWS.util.inherit({
- handleRequest: function handleRequest(httpRequest, httpOptions, callback, errCallback) {
- var self = this;
- var endpoint = httpRequest.endpoint;
- var pathPrefix = '';
- if (!httpOptions) httpOptions = {};
- if (httpOptions.proxy) {
- pathPrefix = endpoint.protocol + '//' + endpoint.hostname;
- if (endpoint.port !== 80 && endpoint.port !== 443) {
- pathPrefix += ':' + endpoint.port;
- }
- endpoint = new AWS.Endpoint(httpOptions.proxy);
- }
- var useSSL = endpoint.protocol === 'https:';
- var http = useSSL ? require('https') : require('http');
- var options = {
- host: endpoint.hostname,
- port: endpoint.port,
- method: httpRequest.method,
- headers: httpRequest.headers,
- path: pathPrefix + httpRequest.path
- };
- AWS.util.update(options, httpOptions);
- if (!httpOptions.agent) {
- options.agent = this.getAgent(useSSL, {
- keepAlive: process.env[CONNECTION_REUSE_ENV_NAME] === '1' ? true : false
- });
- }
- delete options.proxy; // proxy isn't an HTTP option
- delete options.timeout; // timeout isn't an HTTP option
- var stream = http.request(options, function (httpResp) {
- if (stream.didCallback) return;
- callback(httpResp);
- httpResp.emit(
- 'headers',
- httpResp.statusCode,
- httpResp.headers,
- httpResp.statusMessage
- );
- });
- httpRequest.stream = stream; // attach stream to httpRequest
- stream.didCallback = false;
- // connection timeout support
- if (httpOptions.connectTimeout) {
- var connectTimeoutId;
- stream.on('socket', function(socket) {
- if (socket.connecting) {
- connectTimeoutId = setTimeout(function connectTimeout() {
- if (stream.didCallback) return; stream.didCallback = true;
- stream.abort();
- errCallback(AWS.util.error(
- new Error('Socket timed out without establishing a connection'),
- {code: 'TimeoutError'}
- ));
- }, httpOptions.connectTimeout);
- socket.on('connect', function() {
- clearTimeout(connectTimeoutId);
- connectTimeoutId = null;
- });
- }
- });
- }
- // timeout support
- stream.setTimeout(httpOptions.timeout || 0, function() {
- if (stream.didCallback) return; stream.didCallback = true;
- var msg = 'Connection timed out after ' + httpOptions.timeout + 'ms';
- errCallback(AWS.util.error(new Error(msg), {code: 'TimeoutError'}));
- stream.abort();
- });
- stream.on('error', function(err) {
- if (connectTimeoutId) {
- clearTimeout(connectTimeoutId);
- connectTimeoutId = null;
- }
- if (stream.didCallback) return; stream.didCallback = true;
- if ('ECONNRESET' === err.code || 'EPIPE' === err.code || 'ETIMEDOUT' === err.code) {
- errCallback(AWS.util.error(err, {code: 'TimeoutError'}));
- } else {
- errCallback(err);
- }
- });
- var expect = httpRequest.headers.Expect || httpRequest.headers.expect;
- if (expect === '100-continue') {
- stream.once('continue', function() {
- self.writeBody(stream, httpRequest);
- });
- } else {
- this.writeBody(stream, httpRequest);
- }
- return stream;
- },
- writeBody: function writeBody(stream, httpRequest) {
- var body = httpRequest.body;
- var totalBytes = parseInt(httpRequest.headers['Content-Length'], 10);
- if (body instanceof Stream) {
- // For progress support of streaming content -
- // pipe the data through a transform stream to emit 'sendProgress' events
- var progressStream = this.progressStream(stream, totalBytes);
- if (progressStream) {
- body.pipe(progressStream).pipe(stream);
- } else {
- body.pipe(stream);
- }
- } else if (body) {
- // The provided body is a buffer/string and is already fully available in memory -
- // For performance it's best to send it as a whole by calling stream.end(body),
- // Callers expect a 'sendProgress' event which is best emitted once
- // the http request stream has been fully written and all data flushed.
- // The use of totalBytes is important over body.length for strings where
- // length is char length and not byte length.
- stream.once('finish', function() {
- stream.emit('sendProgress', {
- loaded: totalBytes,
- total: totalBytes
- });
- });
- stream.end(body);
- } else {
- // no request body
- stream.end();
- }
- },
- /**
- * Create the https.Agent or http.Agent according to the request schema.
- */
- getAgent: function getAgent(useSSL, agentOptions) {
- var http = useSSL ? require('https') : require('http');
- if (useSSL) {
- if (!AWS.NodeHttpClient.sslAgent) {
- AWS.NodeHttpClient.sslAgent = new http.Agent(AWS.util.merge({
- rejectUnauthorized: process.env.NODE_TLS_REJECT_UNAUTHORIZED === '0' ? false : true
- }, agentOptions || {}));
- AWS.NodeHttpClient.sslAgent.setMaxListeners(0);
- // delegate maxSockets to globalAgent, set a default limit of 50 if current value is Infinity.
- // Users can bypass this default by supplying their own Agent as part of SDK configuration.
- Object.defineProperty(AWS.NodeHttpClient.sslAgent, 'maxSockets', {
- enumerable: true,
- get: function() {
- var defaultMaxSockets = 50;
- var globalAgent = http.globalAgent;
- if (globalAgent && globalAgent.maxSockets !== Infinity && typeof globalAgent.maxSockets === 'number') {
- return globalAgent.maxSockets;
- }
- return defaultMaxSockets;
- }
- });
- }
- return AWS.NodeHttpClient.sslAgent;
- } else {
- if (!AWS.NodeHttpClient.agent) {
- AWS.NodeHttpClient.agent = new http.Agent(agentOptions);
- }
- return AWS.NodeHttpClient.agent;
- }
- },
- progressStream: function progressStream(stream, totalBytes) {
- if (typeof TransformStream === 'undefined') {
- // for node 0.8 there is no streaming progress
- return;
- }
- var loadedBytes = 0;
- var reporter = new TransformStream();
- reporter._transform = function(chunk, encoding, callback) {
- if (chunk) {
- loadedBytes += chunk.length;
- stream.emit('sendProgress', {
- loaded: loadedBytes,
- total: totalBytes
- });
- }
- callback(null, chunk);
- };
- return reporter;
- },
- emitter: null
- });
- /**
- * @!ignore
- */
- /**
- * @api private
- */
- AWS.HttpClient.prototype = AWS.NodeHttpClient.prototype;
- /**
- * @api private
- */
- AWS.HttpClient.streamsApiVersion = ReadableStream ? 2 : 1;
|