"use strict"; /* * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ var _a; Object.defineProperty(exports, "__esModule", { value: true }); const tslib_1 = require("tslib"); const url_1 = require("url"); const debug_1 = tslib_1.__importDefault(require("debug")); const Diagnostic_1 = tslib_1.__importDefault(require("../Diagnostic")); const symbols_1 = require("../symbols"); const connection_1 = require("../connection"); const errors_1 = require("../errors"); const debug = (0, debug_1.default)('elasticsearch'); class BaseConnectionPool { constructor(opts) { var _b; Object.defineProperty(this, "connections", { enumerable: true, configurable: true, writable: true, value: void 0 }); Object.defineProperty(this, "size", { enumerable: true, configurable: true, writable: true, value: void 0 }); Object.defineProperty(this, "Connection", { enumerable: true, configurable: true, writable: true, value: void 0 }); Object.defineProperty(this, "diagnostic", { enumerable: true, configurable: true, writable: true, value: void 0 }); Object.defineProperty(this, "auth", { enumerable: true, configurable: true, writable: true, value: void 0 }); Object.defineProperty(this, "_agent", { enumerable: true, configurable: true, writable: true, value: void 0 }); Object.defineProperty(this, "_proxy", { enumerable: true, configurable: true, writable: true, value: void 0 }); Object.defineProperty(this, "_tls", { enumerable: true, configurable: true, writable: true, value: void 0 }); Object.defineProperty(this, _a, { enumerable: true, configurable: true, writable: true, value: void 0 }); // list of nodes and weights this.connections = []; // how many nodes we have in our scheduler this.size = this.connections.length; this.Connection = opts.Connection; this.diagnostic = (_b = opts.diagnostic) !== null && _b !== void 0 ? _b : new Diagnostic_1.default(); this.auth = opts.auth; this._tls = opts.tls; this._agent = opts.agent; this._proxy = opts.proxy; this[symbols_1.kCaFingerprint] = opts.caFingerprint; } markAlive(connection) { connection.status = connection_1.BaseConnection.statuses.ALIVE; return this; } markDead(connection) { connection.status = connection_1.BaseConnection.statuses.DEAD; return this; } getConnection(opts) { throw new errors_1.ConfigurationError('The getConnection method should be implemented by extended classes'); } /** * Creates a new connection instance. */ createConnection(opts) { if (typeof opts === 'string') { opts = this.urlToHost(opts); } if (this.auth != null) { opts.auth = this.auth; } else if (opts.url.username !== '' && opts.url.password !== '') { opts.auth = { username: decodeURIComponent(opts.url.username), password: decodeURIComponent(opts.url.password) }; } /* istanbul ignore else */ if (opts.tls == null) opts.tls = this._tls; /* istanbul ignore else */ if (opts.agent == null) opts.agent = this._agent; /* istanbul ignore else */ if (opts.proxy == null) opts.proxy = this._proxy; /* istanbul ignore else */ if (opts.diagnostic == null) opts.diagnostic = this.diagnostic; /* istanbul ignore else */ if (opts.caFingerprint == null) opts.caFingerprint = this[symbols_1.kCaFingerprint]; const connection = new this.Connection(opts); for (const conn of this.connections) { if (conn.id === connection.id) { throw new Error(`Connection with id '${connection.id}' is already present`); } } return connection; } /** * Adds a new connection to the pool. * * @param {object|string} host * @returns {ConnectionPool} */ addConnection(connection) { if (Array.isArray(connection)) { const connections = []; for (const conn of connection) { connections.push(this.createConnection(conn)); } return this.update([...this.connections, ...connections]); } else { return this.update([...this.connections, this.createConnection(connection)]); } } /** * Removes a new connection to the pool. * * @param {object} connection * @returns {ConnectionPool} */ removeConnection(connection) { debug('Removing connection', connection); return this.update(this.connections.filter(c => c.id !== connection.id)); } /** * Empties the connection pool. * * @returns {ConnectionPool} */ async empty() { debug('Emptying the connection pool'); const connections = this.connections; this.connections = []; this.size = 0; for (const connection of connections) { await connection.close(); } } /** * Update the ConnectionPool with new connections. * * @param {array} array of connections * @returns {ConnectionPool} */ update(nodes) { debug('Updating the connection pool'); const newConnections = []; const oldConnections = []; for (const node of nodes) { // if we already have a given connection in the pool // we mark it as alive and we do not close the connection // to avoid socket issues const connectionById = this.connections.find(c => c.id === node.id); const connectionByUrl = this.connections.find(c => c.id === node.url.href); if (connectionById != null) { debug(`The connection with id '${node.id}' is already present`); this.markAlive(connectionById); newConnections.push(connectionById); // in case the user has passed a single url (or an array of urls), // the connection id will be the full href; to avoid closing valid connections // because are not present in the pool, we check also the node url, // and if is already present we update its id with the ES provided one. } else if (connectionByUrl != null) { connectionByUrl.id = node.id; this.markAlive(connectionByUrl); newConnections.push(connectionByUrl); } else { if (node instanceof connection_1.BaseConnection) { newConnections.push(node); } else { newConnections.push(this.createConnection(node)); } } } const ids = nodes.map(c => c.id); // remove all the dead connections and old connections for (const connection of this.connections) { if (!ids.includes(connection.id)) { oldConnections.push(connection); } } // close old connections for (const connection of oldConnections) { connection.close().catch(/* istanbul ignore next */ () => { }); } this.connections = newConnections; this.size = this.connections.length; return this; } /** * Transforms the nodes objects to a host object. * * @param {object} nodes * @returns {array} hosts */ nodesToHost(nodes, protocol) { const ids = Object.keys(nodes); const hosts = []; for (let i = 0, len = ids.length; i < len; i++) { const node = nodes[ids[i]]; // newly-added nodes do not have http assigned yet, so skip if (node.http === undefined) continue; // If there is no protocol in // the `publish_address` new URL will throw // the publish_address can have two forms: // - ip:port // - hostname/ip:port // if we encounter the second case, we should // use the hostname instead of the ip let address = node.http.publish_address; const parts = address.split('/'); // the url is in the form of hostname/ip:port if (parts.length > 1) { const hostname = parts[0]; const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1); address = `${hostname}:${port}`; } address = address.slice(0, 4) === 'http' /* istanbul ignore next */ ? address : `${protocol}//${address}`; hosts.push({ url: new url_1.URL(address), id: ids[i] }); } return hosts; } /** * Transforms an url string to a host object * * @param {string} url * @returns {object} host */ urlToHost(url) { return { url: new url_1.URL(url) }; } } exports.default = BaseConnectionPool; _a = symbols_1.kCaFingerprint; //# sourceMappingURL=BaseConnectionPool.js.map