From dc4b9df635c7176d80acc4c7fbbd68152bff5444 Mon Sep 17 00:00:00 2001 From: Pierre Fersing Date: Fri, 9 Dec 2016 21:04:22 +0100 Subject: [PATCH] Re-add cluster-client Cluster client could be used for InfluxDB Enterprise or old InfluxDB 0.11 cluster. --- influxdb/client.py | 176 ++++++++++++++++++++++++++++++++++ influxdb/tests/client_test.py | 124 ++++++++++++++++++++++++ 2 files changed, 300 insertions(+) diff --git a/influxdb/client.py b/influxdb/client.py index ab9aa409..2b77fc99 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -7,8 +7,12 @@ from __future__ import print_function from __future__ import unicode_literals +from functools import wraps import json import socket +import time +import threading +import random import requests import requests.exceptions from sys import version_info @@ -110,6 +114,9 @@ def __init__(self, 'Accept': 'text/plain' } + # _baseurl, _host and _port are properties to allow + # influxdb.InfluxDBClusterClient to override them with + # thread-local variables @property def _baseurl(self): return self._get_baseurl() @@ -783,6 +790,175 @@ def send_packet(self, packet, protocol='json'): self.udp_socket.sendto(data, (self._host, self.udp_port)) +class InfluxDBClusterClient(object): + """The :class:`~.InfluxDBClusterClient` is the client for connecting + to a cluster of InfluxDB servers. Each query hits different host from the + list of hosts. + + WARNING: This only works if all node in the lists are equivalent. E.g. + writing or reading to one node or another give the same result. + This works with InfluxDB Enterprise cluster or old cluster using + InfluxDB 0.11. + It does NOT works with newer open-source cluster solution using + influxdb-relay. + + :param hosts: all hosts to be included in the cluster, each of which + should be in the format (address, port), + e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]. Defaults to + [('localhost', 8086)] + :type hosts: list of tuples + :param shuffle: whether the queries should hit servers evenly(randomly), + defaults to True + :type shuffle: bool + :param client_base_class: the base class for the cluster client. + This parameter is used to enable the support of different client + types. Defaults to :class:`~.InfluxDBClient` + :param healing_delay: the delay in seconds, counting from last failure of + a server, before re-adding server to the list of working servers. + Defaults to 15 minutes (900 seconds) + """ + + def __init__(self, + hosts=[('localhost', 8086)], + username='root', + password='root', + database=None, + ssl=False, + verify_ssl=False, + timeout=None, + use_udp=False, + udp_port=4444, + shuffle=True, + client_base_class=InfluxDBClient, + healing_delay=900, + ): + self.clients = [self] # Keep it backwards compatible + self.hosts = hosts + self.bad_hosts = [] # Corresponding server has failures in history + self.shuffle = shuffle + self.healing_delay = healing_delay + self._last_healing = time.time() + host, port = self.hosts[0] + self._hosts_lock = threading.Lock() + self._thread_local = threading.local() + self._client = client_base_class(host=host, + port=port, + username=username, + password=password, + database=database, + ssl=ssl, + verify_ssl=verify_ssl, + timeout=timeout, + use_udp=use_udp, + udp_port=udp_port) + for method in dir(client_base_class): + orig_attr = getattr(client_base_class, method, '') + if method.startswith('_') or not callable(orig_attr): + continue + + setattr(self, method, self._make_func(orig_attr)) + + self._client._get_host = self._get_host + self._client._get_port = self._get_port + self._client._get_baseurl = self._get_baseurl + self._update_client_host(self.hosts[0]) + + @staticmethod + def from_DSN(dsn, client_base_class=InfluxDBClient, + shuffle=True, **kwargs): + """Same as :meth:`~.InfluxDBClient.from_DSN`, but supports + multiple servers. + + :param shuffle: whether the queries should hit servers + evenly(randomly), defaults to True + :type shuffle: bool + :param client_base_class: the base class for all clients in the + cluster. This parameter is used to enable the support of + different client types. Defaults to :class:`~.InfluxDBClient` + + :Example: + + :: + + >> cluster = InfluxDBClusterClient.from_DSN('influxdb://usr:pwd\ +@host1:8086,usr:pwd@host2:8086/db_name', timeout=5) + >> type(cluster) + + >> cluster.hosts + [('host1', 8086), ('host2', 8086)] + >> cluster._client + ] + """ + init_args = parse_dsn(dsn) + init_args.update(**kwargs) + init_args['shuffle'] = shuffle + init_args['client_base_class'] = client_base_class + cluster_client = InfluxDBClusterClient(**init_args) + return cluster_client + + def _update_client_host(self, host): + self._thread_local.host, self._thread_local.port = host + self._thread_local.baseurl = "{0}://{1}:{2}".format( + self._client._scheme, + self._client._host, + self._client._port + ) + + def _get_baseurl(self): + return self._thread_local.baseurl + + def _get_host(self): + return self._thread_local.host + + def _get_port(self): + return self._thread_local.port + + def _make_func(self, orig_func): + + @wraps(orig_func) + def func(*args, **kwargs): + now = time.time() + with self._hosts_lock: + if (self.bad_hosts and + self._last_healing + self.healing_delay < now): + h = self.bad_hosts.pop(0) + self.hosts.append(h) + self._last_healing = now + + if self.shuffle: + random.shuffle(self.hosts) + + hosts = self.hosts + self.bad_hosts + + for h in hosts: + bad_host = False + try: + self._update_client_host(h) + return orig_func(self._client, *args, **kwargs) + except InfluxDBClientError as e: + # Errors caused by user's requests, re-raise + raise e + except ValueError as e: + raise e + except Exception as e: + # Errors that might caused by server failure, try another + bad_host = True + with self._hosts_lock: + if h in self.hosts: + self.hosts.remove(h) + self.bad_hosts.append(h) + self._last_healing = now + finally: + with self._hosts_lock: + if not bad_host and h in self.bad_hosts: + self.bad_hosts.remove(h) + self.hosts.append(h) + + raise InfluxDBServerError("InfluxDB: no viable server!") + + return func + + def parse_dsn(dsn): conn_params = urlparse(dsn) init_args = {} diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index f586df3f..61379235 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -23,6 +23,7 @@ import requests import requests.exceptions import socket +import time import requests_mock import random from nose.tools import raises @@ -32,6 +33,7 @@ import unittest from influxdb import InfluxDBClient +from influxdb.client import InfluxDBServerError, InfluxDBClusterClient def _build_response_object(status_code=200, content=""): @@ -811,3 +813,125 @@ def query(self, raise Exception("Fail Twice") else: return "Success" + + +class TestInfluxDBClusterClient(unittest.TestCase): + + def setUp(self): + # By default, raise exceptions on warnings + warnings.simplefilter('error', FutureWarning) + + self.hosts = [('host1', 8086), ('host2', 8086), ('host3', 8086)] + self.dsn_string = 'influxdb://uSr:pWd@host1:8086,uSr:pWd@host2:8086/db' + + def test_init(self): + cluster = InfluxDBClusterClient(hosts=self.hosts, + username='username', + password='password', + database='database', + shuffle=False, + client_base_class=FakeClient) + self.assertEqual(3, len(cluster.hosts)) + self.assertEqual(0, len(cluster.bad_hosts)) + self.assertIn((cluster._client._host, + cluster._client._port), cluster.hosts) + + def test_one_server_fails(self): + cluster = InfluxDBClusterClient(hosts=self.hosts, + database='database', + shuffle=False, + client_base_class=FakeClient) + self.assertEqual('Success', cluster.query('Fail once')) + self.assertEqual(2, len(cluster.hosts)) + self.assertEqual(1, len(cluster.bad_hosts)) + + def test_two_servers_fail(self): + cluster = InfluxDBClusterClient(hosts=self.hosts, + database='database', + shuffle=False, + client_base_class=FakeClient) + self.assertEqual('Success', cluster.query('Fail twice')) + self.assertEqual(1, len(cluster.hosts)) + self.assertEqual(2, len(cluster.bad_hosts)) + + def test_all_fail(self): + cluster = InfluxDBClusterClient(hosts=self.hosts, + database='database', + shuffle=True, + client_base_class=FakeClient) + with self.assertRaises(InfluxDBServerError): + cluster.query('Fail') + self.assertEqual(0, len(cluster.hosts)) + self.assertEqual(3, len(cluster.bad_hosts)) + + def test_all_good(self): + cluster = InfluxDBClusterClient(hosts=self.hosts, + database='database', + shuffle=True, + client_base_class=FakeClient) + self.assertEqual('Success', cluster.query('')) + self.assertEqual(3, len(cluster.hosts)) + self.assertEqual(0, len(cluster.bad_hosts)) + + def test_recovery(self): + cluster = InfluxDBClusterClient(hosts=self.hosts, + database='database', + shuffle=True, + client_base_class=FakeClient) + with self.assertRaises(InfluxDBServerError): + cluster.query('Fail') + self.assertEqual('Success', cluster.query('')) + self.assertEqual(1, len(cluster.hosts)) + self.assertEqual(2, len(cluster.bad_hosts)) + + def test_healing(self): + cluster = InfluxDBClusterClient(hosts=self.hosts, + database='database', + shuffle=True, + healing_delay=1, + client_base_class=FakeClient) + with self.assertRaises(InfluxDBServerError): + cluster.query('Fail') + self.assertEqual('Success', cluster.query('')) + time.sleep(1.1) + self.assertEqual('Success', cluster.query('')) + self.assertEqual(2, len(cluster.hosts)) + self.assertEqual(1, len(cluster.bad_hosts)) + time.sleep(1.1) + self.assertEqual('Success', cluster.query('')) + self.assertEqual(3, len(cluster.hosts)) + self.assertEqual(0, len(cluster.bad_hosts)) + + def test_dsn(self): + cli = InfluxDBClusterClient.from_DSN(self.dsn_string) + self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts) + self.assertEqual('http://host1:8086', cli._client._baseurl) + self.assertEqual('uSr', cli._client._username) + self.assertEqual('pWd', cli._client._password) + self.assertEqual('db', cli._client._database) + self.assertFalse(cli._client.use_udp) + + cli = InfluxDBClusterClient.from_DSN('udp+' + self.dsn_string) + self.assertTrue(cli._client.use_udp) + + cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string) + self.assertEqual('https://host1:8086', cli._client._baseurl) + + cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string, + **{'ssl': False}) + self.assertEqual('http://host1:8086', cli._client._baseurl) + + def test_dsn_password_caps(self): + cli = InfluxDBClusterClient.from_DSN( + 'https+influxdb://usr:pWd@host:8086/db') + self.assertEqual('pWd', cli._client._password) + + def test_dsn_mixed_scheme_case(self): + cli = InfluxDBClusterClient.from_DSN( + 'hTTps+inFLUxdb://usr:pWd@host:8086/db') + self.assertEqual('pWd', cli._client._password) + self.assertEqual('https://host:8086', cli._client._baseurl) + + cli = InfluxDBClusterClient.from_DSN( + 'uDP+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db') + self.assertTrue(cli._client.use_udp)