diff --git a/README.rst b/README.rst index 10f8317b..d3a8d775 100644 --- a/README.rst +++ b/README.rst @@ -108,20 +108,6 @@ Here's a basic example (for more see the examples directory):: >>> print("Result: {0}".format(result)) -If you want to connect to a cluster, you could initialize a ``InfluxDBClusterClient``:: - - $ python - - >>> from influxdb import InfluxDBClusterClient - - >>> cc = InfluxDBClusterClient(hosts = [('192.168.0.1', 8086), - ('192.168.0.2', 8086), - ('192.168.0.3', 8086)], - username='root', - password='root', - database='example') - -``InfluxDBClusterClient`` has the same methods as ``InfluxDBClient``, it basically is a proxy to multiple InfluxDBClients. Testing ======= diff --git a/docs/source/api-documentation.rst b/docs/source/api-documentation.rst index c6178fed..d00600e6 100644 --- a/docs/source/api-documentation.rst +++ b/docs/source/api-documentation.rst @@ -45,16 +45,6 @@ These clients are initiated in the same way as the :members: :undoc-members: ------------------------------- -:class:`InfluxDBClusterClient` ------------------------------- - - -.. currentmodule:: influxdb.InfluxDBClusterClient -.. autoclass:: influxdb.InfluxDBClusterClient - :members: - :undoc-members: - ------------------------ :class:`DataFrameClient` ------------------------ diff --git a/influxdb/__init__.py b/influxdb/__init__.py index ad0f6052..fc2e4261 100644 --- a/influxdb/__init__.py +++ b/influxdb/__init__.py @@ -6,14 +6,12 @@ from __future__ import unicode_literals from .client import InfluxDBClient -from .client import InfluxDBClusterClient from .dataframe_client import DataFrameClient from .helper import SeriesHelper __all__ = [ 'InfluxDBClient', - 'InfluxDBClusterClient', 'DataFrameClient', 'SeriesHelper', ] diff --git a/influxdb/client.py b/influxdb/client.py index 5e60011c..4eff5157 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -7,12 +7,8 @@ from __future__ import print_function from __future__ import unicode_literals -from functools import wraps import json import socket -import time -import threading -import random import requests import requests.exceptions from sys import version_info @@ -114,8 +110,6 @@ def __init__(self, 'Accept': 'text/plain' } - # _baseurl, _host and _port are properties to allow InfluxDBClusterClient - # to override them with thread-local variables @property def _baseurl(self): return self._get_baseurl() @@ -772,168 +766,6 @@ def send_packet(self, packet): self.udp_socket.sendto(data, (self._host, self.udp_port)) -class InfluxDBClusterClient(object): - """The :class:`~.InfluxDBClusterClient` is the client for connecting - to a cluster of InfluxDB servers. Each query hits different host from the - list of hosts. - - :param hosts: all hosts to be included in the cluster, each of which - should be in the format (address, port), - e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]. Defaults to - [('localhost', 8086)] - :type hosts: list of tuples - :param shuffle: whether the queries should hit servers evenly(randomly), - defaults to True - :type shuffle: bool - :param client_base_class: the base class for the cluster client. - This parameter is used to enable the support of different client - types. Defaults to :class:`~.InfluxDBClient` - :param healing_delay: the delay in seconds, counting from last failure of - a server, before re-adding server to the list of working servers. - Defaults to 15 minutes (900 seconds) - """ - - def __init__(self, - hosts=[('localhost', 8086)], - username='root', - password='root', - database=None, - ssl=False, - verify_ssl=False, - timeout=None, - use_udp=False, - udp_port=4444, - shuffle=True, - client_base_class=InfluxDBClient, - healing_delay=900, - ): - self.clients = [self] # Keep it backwards compatible - self.hosts = hosts - self.bad_hosts = [] # Corresponding server has failures in history - self.shuffle = shuffle - self.healing_delay = healing_delay - self._last_healing = time.time() - host, port = self.hosts[0] - self._hosts_lock = threading.Lock() - self._thread_local = threading.local() - self._client = client_base_class(host=host, - port=port, - username=username, - password=password, - database=database, - ssl=ssl, - verify_ssl=verify_ssl, - timeout=timeout, - use_udp=use_udp, - udp_port=udp_port) - for method in dir(client_base_class): - orig_attr = getattr(client_base_class, method, '') - if method.startswith('_') or not callable(orig_attr): - continue - - setattr(self, method, self._make_func(orig_attr)) - - self._client._get_host = self._get_host - self._client._get_port = self._get_port - self._client._get_baseurl = self._get_baseurl - self._update_client_host(self.hosts[0]) - - @staticmethod - def from_DSN(dsn, client_base_class=InfluxDBClient, - shuffle=True, **kwargs): - """Same as :meth:`~.InfluxDBClient.from_DSN`, but supports - multiple servers. - - :param shuffle: whether the queries should hit servers - evenly(randomly), defaults to True - :type shuffle: bool - :param client_base_class: the base class for all clients in the - cluster. This parameter is used to enable the support of - different client types. Defaults to :class:`~.InfluxDBClient` - - :Example: - - :: - - >> cluster = InfluxDBClusterClient.from_DSN('influxdb://usr:pwd\ -@host1:8086,usr:pwd@host2:8086/db_name', timeout=5) - >> type(cluster) - - >> cluster.hosts - [('host1', 8086), ('host2', 8086)] - >> cluster._client - ] - """ - init_args = parse_dsn(dsn) - init_args.update(**kwargs) - init_args['shuffle'] = shuffle - init_args['client_base_class'] = client_base_class - cluster_client = InfluxDBClusterClient(**init_args) - return cluster_client - - def _update_client_host(self, host): - self._thread_local.host, self._thread_local.port = host - self._thread_local.baseurl = "{0}://{1}:{2}".format( - self._client._scheme, - self._client._host, - self._client._port - ) - - def _get_baseurl(self): - return self._thread_local.baseurl - - def _get_host(self): - return self._thread_local.host - - def _get_port(self): - return self._thread_local.port - - def _make_func(self, orig_func): - - @wraps(orig_func) - def func(*args, **kwargs): - now = time.time() - with self._hosts_lock: - if (self.bad_hosts and - self._last_healing + self.healing_delay < now): - h = self.bad_hosts.pop(0) - self.hosts.append(h) - self._last_healing = now - - if self.shuffle: - random.shuffle(self.hosts) - - hosts = self.hosts + self.bad_hosts - - for h in hosts: - bad_host = False - try: - self._update_client_host(h) - return orig_func(self._client, *args, **kwargs) - except InfluxDBClientError as e: - # Errors caused by user's requests, re-raise - raise e - except ValueError as e: - raise e - except Exception as e: - # Errors that might caused by server failure, try another - bad_host = True - with self._hosts_lock: - if h in self.hosts: - self.hosts.remove(h) - self.bad_hosts.append(h) - self._last_healing = now - finally: - with self._hosts_lock: - if not bad_host and h in self.bad_hosts: - self.bad_hosts.remove(h) - self.hosts.append(h) - - raise InfluxDBServerError("InfluxDB: no viable server!") - - return func - - def parse_dsn(dsn): conn_params = urlparse(dsn) init_args = {} diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index e602cdff..2d668c60 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -23,7 +23,6 @@ import requests import requests.exceptions import socket -import time import requests_mock import random from nose.tools import raises @@ -32,8 +31,7 @@ import mock import unittest -from influxdb import InfluxDBClient, InfluxDBClusterClient -from influxdb.client import InfluxDBServerError +from influxdb import InfluxDBClient def _build_response_object(status_code=200, content=""): @@ -846,125 +844,3 @@ def query(self, raise Exception("Fail Twice") else: return "Success" - - -class TestInfluxDBClusterClient(unittest.TestCase): - - def setUp(self): - # By default, raise exceptions on warnings - warnings.simplefilter('error', FutureWarning) - - self.hosts = [('host1', 8086), ('host2', 8086), ('host3', 8086)] - self.dsn_string = 'influxdb://uSr:pWd@host1:8086,uSr:pWd@host2:8086/db' - - def test_init(self): - cluster = InfluxDBClusterClient(hosts=self.hosts, - username='username', - password='password', - database='database', - shuffle=False, - client_base_class=FakeClient) - self.assertEqual(3, len(cluster.hosts)) - self.assertEqual(0, len(cluster.bad_hosts)) - self.assertIn((cluster._client._host, - cluster._client._port), cluster.hosts) - - def test_one_server_fails(self): - cluster = InfluxDBClusterClient(hosts=self.hosts, - database='database', - shuffle=False, - client_base_class=FakeClient) - self.assertEqual('Success', cluster.query('Fail once')) - self.assertEqual(2, len(cluster.hosts)) - self.assertEqual(1, len(cluster.bad_hosts)) - - def test_two_servers_fail(self): - cluster = InfluxDBClusterClient(hosts=self.hosts, - database='database', - shuffle=False, - client_base_class=FakeClient) - self.assertEqual('Success', cluster.query('Fail twice')) - self.assertEqual(1, len(cluster.hosts)) - self.assertEqual(2, len(cluster.bad_hosts)) - - def test_all_fail(self): - cluster = InfluxDBClusterClient(hosts=self.hosts, - database='database', - shuffle=True, - client_base_class=FakeClient) - with self.assertRaises(InfluxDBServerError): - cluster.query('Fail') - self.assertEqual(0, len(cluster.hosts)) - self.assertEqual(3, len(cluster.bad_hosts)) - - def test_all_good(self): - cluster = InfluxDBClusterClient(hosts=self.hosts, - database='database', - shuffle=True, - client_base_class=FakeClient) - self.assertEqual('Success', cluster.query('')) - self.assertEqual(3, len(cluster.hosts)) - self.assertEqual(0, len(cluster.bad_hosts)) - - def test_recovery(self): - cluster = InfluxDBClusterClient(hosts=self.hosts, - database='database', - shuffle=True, - client_base_class=FakeClient) - with self.assertRaises(InfluxDBServerError): - cluster.query('Fail') - self.assertEqual('Success', cluster.query('')) - self.assertEqual(1, len(cluster.hosts)) - self.assertEqual(2, len(cluster.bad_hosts)) - - def test_healing(self): - cluster = InfluxDBClusterClient(hosts=self.hosts, - database='database', - shuffle=True, - healing_delay=1, - client_base_class=FakeClient) - with self.assertRaises(InfluxDBServerError): - cluster.query('Fail') - self.assertEqual('Success', cluster.query('')) - time.sleep(1.1) - self.assertEqual('Success', cluster.query('')) - self.assertEqual(2, len(cluster.hosts)) - self.assertEqual(1, len(cluster.bad_hosts)) - time.sleep(1.1) - self.assertEqual('Success', cluster.query('')) - self.assertEqual(3, len(cluster.hosts)) - self.assertEqual(0, len(cluster.bad_hosts)) - - def test_dsn(self): - cli = InfluxDBClusterClient.from_DSN(self.dsn_string) - self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts) - self.assertEqual('http://host1:8086', cli._client._baseurl) - self.assertEqual('uSr', cli._client._username) - self.assertEqual('pWd', cli._client._password) - self.assertEqual('db', cli._client._database) - self.assertFalse(cli._client.use_udp) - - cli = InfluxDBClusterClient.from_DSN('udp+' + self.dsn_string) - self.assertTrue(cli._client.use_udp) - - cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string) - self.assertEqual('https://host1:8086', cli._client._baseurl) - - cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string, - **{'ssl': False}) - self.assertEqual('http://host1:8086', cli._client._baseurl) - - def test_dsn_password_caps(self): - cli = InfluxDBClusterClient.from_DSN( - 'https+influxdb://usr:pWd@host:8086/db') - self.assertEqual('pWd', cli._client._password) - - def test_dsn_mixed_scheme_case(self): - cli = InfluxDBClusterClient.from_DSN( - 'hTTps+inFLUxdb://usr:pWd@host:8086/db') - self.assertEqual('pWd', cli._client._password) - self.assertEqual('https://host:8086', cli._client._baseurl) - - cli = InfluxDBClusterClient.from_DSN( - 'uDP+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db') - self.assertTrue(cli._client.use_udp)