Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit 7debaca

Browse files
gakeraviau
authored andcommitted
Merge pull request #354 from gaker/remove-cluster-client (Thanks @gaker!)
Remove cluster client
1 parent 9a2caa8 commit 7debaca

File tree

5 files changed

+1
-319
lines changed

5 files changed

+1
-319
lines changed

README.rst

-14
Original file line numberDiff line numberDiff line change
@@ -108,20 +108,6 @@ Here's a basic example (for more see the examples directory)::
108108

109109
>>> print("Result: {0}".format(result))
110110

111-
If you want to connect to a cluster, you could initialize a ``InfluxDBClusterClient``::
112-
113-
$ python
114-
115-
>>> from influxdb import InfluxDBClusterClient
116-
117-
>>> cc = InfluxDBClusterClient(hosts = [('192.168.0.1', 8086),
118-
('192.168.0.2', 8086),
119-
('192.168.0.3', 8086)],
120-
username='root',
121-
password='root',
122-
database='example')
123-
124-
``InfluxDBClusterClient`` has the same methods as ``InfluxDBClient``, it basically is a proxy to multiple InfluxDBClients.
125111

126112
Testing
127113
=======

docs/source/api-documentation.rst

-10
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,6 @@ These clients are initiated in the same way as the
4545
:members:
4646
:undoc-members:
4747

48-
------------------------------
49-
:class:`InfluxDBClusterClient`
50-
------------------------------
51-
52-
53-
.. currentmodule:: influxdb.InfluxDBClusterClient
54-
.. autoclass:: influxdb.InfluxDBClusterClient
55-
:members:
56-
:undoc-members:
57-
5848
------------------------
5949
:class:`DataFrameClient`
6050
------------------------

influxdb/__init__.py

-2
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@
66
from __future__ import unicode_literals
77

88
from .client import InfluxDBClient
9-
from .client import InfluxDBClusterClient
109
from .dataframe_client import DataFrameClient
1110
from .helper import SeriesHelper
1211

1312

1413
__all__ = [
1514
'InfluxDBClient',
16-
'InfluxDBClusterClient',
1715
'DataFrameClient',
1816
'SeriesHelper',
1917
]

influxdb/client.py

-168
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,8 @@
77
from __future__ import print_function
88
from __future__ import unicode_literals
99

10-
from functools import wraps
1110
import json
1211
import socket
13-
import time
14-
import threading
15-
import random
1612
import requests
1713
import requests.exceptions
1814
from sys import version_info
@@ -114,8 +110,6 @@ def __init__(self,
114110
'Accept': 'text/plain'
115111
}
116112

117-
# _baseurl, _host and _port are properties to allow InfluxDBClusterClient
118-
# to override them with thread-local variables
119113
@property
120114
def _baseurl(self):
121115
return self._get_baseurl()
@@ -753,168 +747,6 @@ def send_packet(self, packet):
753747
self.udp_socket.sendto(data, (self._host, self.udp_port))
754748

755749

756-
class InfluxDBClusterClient(object):
757-
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
758-
to a cluster of InfluxDB servers. Each query hits different host from the
759-
list of hosts.
760-
761-
:param hosts: all hosts to be included in the cluster, each of which
762-
should be in the format (address, port),
763-
e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]. Defaults to
764-
[('localhost', 8086)]
765-
:type hosts: list of tuples
766-
:param shuffle: whether the queries should hit servers evenly(randomly),
767-
defaults to True
768-
:type shuffle: bool
769-
:param client_base_class: the base class for the cluster client.
770-
This parameter is used to enable the support of different client
771-
types. Defaults to :class:`~.InfluxDBClient`
772-
:param healing_delay: the delay in seconds, counting from last failure of
773-
a server, before re-adding server to the list of working servers.
774-
Defaults to 15 minutes (900 seconds)
775-
"""
776-
777-
def __init__(self,
778-
hosts=[('localhost', 8086)],
779-
username='root',
780-
password='root',
781-
database=None,
782-
ssl=False,
783-
verify_ssl=False,
784-
timeout=None,
785-
use_udp=False,
786-
udp_port=4444,
787-
shuffle=True,
788-
client_base_class=InfluxDBClient,
789-
healing_delay=900,
790-
):
791-
self.clients = [self] # Keep it backwards compatible
792-
self.hosts = hosts
793-
self.bad_hosts = [] # Corresponding server has failures in history
794-
self.shuffle = shuffle
795-
self.healing_delay = healing_delay
796-
self._last_healing = time.time()
797-
host, port = self.hosts[0]
798-
self._hosts_lock = threading.Lock()
799-
self._thread_local = threading.local()
800-
self._client = client_base_class(host=host,
801-
port=port,
802-
username=username,
803-
password=password,
804-
database=database,
805-
ssl=ssl,
806-
verify_ssl=verify_ssl,
807-
timeout=timeout,
808-
use_udp=use_udp,
809-
udp_port=udp_port)
810-
for method in dir(client_base_class):
811-
orig_attr = getattr(client_base_class, method, '')
812-
if method.startswith('_') or not callable(orig_attr):
813-
continue
814-
815-
setattr(self, method, self._make_func(orig_attr))
816-
817-
self._client._get_host = self._get_host
818-
self._client._get_port = self._get_port
819-
self._client._get_baseurl = self._get_baseurl
820-
self._update_client_host(self.hosts[0])
821-
822-
@staticmethod
823-
def from_DSN(dsn, client_base_class=InfluxDBClient,
824-
shuffle=True, **kwargs):
825-
"""Same as :meth:`~.InfluxDBClient.from_DSN`, but supports
826-
multiple servers.
827-
828-
:param shuffle: whether the queries should hit servers
829-
evenly(randomly), defaults to True
830-
:type shuffle: bool
831-
:param client_base_class: the base class for all clients in the
832-
cluster. This parameter is used to enable the support of
833-
different client types. Defaults to :class:`~.InfluxDBClient`
834-
835-
:Example:
836-
837-
::
838-
839-
>> cluster = InfluxDBClusterClient.from_DSN('influxdb://usr:pwd\
840-
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
841-
>> type(cluster)
842-
<class 'influxdb.client.InfluxDBClusterClient'>
843-
>> cluster.hosts
844-
[('host1', 8086), ('host2', 8086)]
845-
>> cluster._client
846-
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
847-
"""
848-
init_args = parse_dsn(dsn)
849-
init_args.update(**kwargs)
850-
init_args['shuffle'] = shuffle
851-
init_args['client_base_class'] = client_base_class
852-
cluster_client = InfluxDBClusterClient(**init_args)
853-
return cluster_client
854-
855-
def _update_client_host(self, host):
856-
self._thread_local.host, self._thread_local.port = host
857-
self._thread_local.baseurl = "{0}://{1}:{2}".format(
858-
self._client._scheme,
859-
self._client._host,
860-
self._client._port
861-
)
862-
863-
def _get_baseurl(self):
864-
return self._thread_local.baseurl
865-
866-
def _get_host(self):
867-
return self._thread_local.host
868-
869-
def _get_port(self):
870-
return self._thread_local.port
871-
872-
def _make_func(self, orig_func):
873-
874-
@wraps(orig_func)
875-
def func(*args, **kwargs):
876-
now = time.time()
877-
with self._hosts_lock:
878-
if (self.bad_hosts and
879-
self._last_healing + self.healing_delay < now):
880-
h = self.bad_hosts.pop(0)
881-
self.hosts.append(h)
882-
self._last_healing = now
883-
884-
if self.shuffle:
885-
random.shuffle(self.hosts)
886-
887-
hosts = self.hosts + self.bad_hosts
888-
889-
for h in hosts:
890-
bad_host = False
891-
try:
892-
self._update_client_host(h)
893-
return orig_func(self._client, *args, **kwargs)
894-
except InfluxDBClientError as e:
895-
# Errors caused by user's requests, re-raise
896-
raise e
897-
except ValueError as e:
898-
raise e
899-
except Exception as e:
900-
# Errors that might caused by server failure, try another
901-
bad_host = True
902-
with self._hosts_lock:
903-
if h in self.hosts:
904-
self.hosts.remove(h)
905-
self.bad_hosts.append(h)
906-
self._last_healing = now
907-
finally:
908-
with self._hosts_lock:
909-
if not bad_host and h in self.bad_hosts:
910-
self.bad_hosts.remove(h)
911-
self.hosts.append(h)
912-
913-
raise InfluxDBServerError("InfluxDB: no viable server!")
914-
915-
return func
916-
917-
918750
def parse_dsn(dsn):
919751
conn_params = urlparse(dsn)
920752
init_args = {}

influxdb/tests/client_test.py

+1-125
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import requests
2424
import requests.exceptions
2525
import socket
26-
import time
2726
import requests_mock
2827
import random
2928
from nose.tools import raises
@@ -32,8 +31,7 @@
3231
import mock
3332
import unittest
3433

35-
from influxdb import InfluxDBClient, InfluxDBClusterClient
36-
from influxdb.client import InfluxDBServerError
34+
from influxdb import InfluxDBClient
3735

3836

3937
def _build_response_object(status_code=200, content=""):
@@ -813,125 +811,3 @@ def query(self,
813811
raise Exception("Fail Twice")
814812
else:
815813
return "Success"
816-
817-
818-
class TestInfluxDBClusterClient(unittest.TestCase):
819-
820-
def setUp(self):
821-
# By default, raise exceptions on warnings
822-
warnings.simplefilter('error', FutureWarning)
823-
824-
self.hosts = [('host1', 8086), ('host2', 8086), ('host3', 8086)]
825-
self.dsn_string = 'influxdb://uSr:pWd@host1:8086,uSr:pWd@host2:8086/db'
826-
827-
def test_init(self):
828-
cluster = InfluxDBClusterClient(hosts=self.hosts,
829-
username='username',
830-
password='password',
831-
database='database',
832-
shuffle=False,
833-
client_base_class=FakeClient)
834-
self.assertEqual(3, len(cluster.hosts))
835-
self.assertEqual(0, len(cluster.bad_hosts))
836-
self.assertIn((cluster._client._host,
837-
cluster._client._port), cluster.hosts)
838-
839-
def test_one_server_fails(self):
840-
cluster = InfluxDBClusterClient(hosts=self.hosts,
841-
database='database',
842-
shuffle=False,
843-
client_base_class=FakeClient)
844-
self.assertEqual('Success', cluster.query('Fail once'))
845-
self.assertEqual(2, len(cluster.hosts))
846-
self.assertEqual(1, len(cluster.bad_hosts))
847-
848-
def test_two_servers_fail(self):
849-
cluster = InfluxDBClusterClient(hosts=self.hosts,
850-
database='database',
851-
shuffle=False,
852-
client_base_class=FakeClient)
853-
self.assertEqual('Success', cluster.query('Fail twice'))
854-
self.assertEqual(1, len(cluster.hosts))
855-
self.assertEqual(2, len(cluster.bad_hosts))
856-
857-
def test_all_fail(self):
858-
cluster = InfluxDBClusterClient(hosts=self.hosts,
859-
database='database',
860-
shuffle=True,
861-
client_base_class=FakeClient)
862-
with self.assertRaises(InfluxDBServerError):
863-
cluster.query('Fail')
864-
self.assertEqual(0, len(cluster.hosts))
865-
self.assertEqual(3, len(cluster.bad_hosts))
866-
867-
def test_all_good(self):
868-
cluster = InfluxDBClusterClient(hosts=self.hosts,
869-
database='database',
870-
shuffle=True,
871-
client_base_class=FakeClient)
872-
self.assertEqual('Success', cluster.query(''))
873-
self.assertEqual(3, len(cluster.hosts))
874-
self.assertEqual(0, len(cluster.bad_hosts))
875-
876-
def test_recovery(self):
877-
cluster = InfluxDBClusterClient(hosts=self.hosts,
878-
database='database',
879-
shuffle=True,
880-
client_base_class=FakeClient)
881-
with self.assertRaises(InfluxDBServerError):
882-
cluster.query('Fail')
883-
self.assertEqual('Success', cluster.query(''))
884-
self.assertEqual(1, len(cluster.hosts))
885-
self.assertEqual(2, len(cluster.bad_hosts))
886-
887-
def test_healing(self):
888-
cluster = InfluxDBClusterClient(hosts=self.hosts,
889-
database='database',
890-
shuffle=True,
891-
healing_delay=1,
892-
client_base_class=FakeClient)
893-
with self.assertRaises(InfluxDBServerError):
894-
cluster.query('Fail')
895-
self.assertEqual('Success', cluster.query(''))
896-
time.sleep(1.1)
897-
self.assertEqual('Success', cluster.query(''))
898-
self.assertEqual(2, len(cluster.hosts))
899-
self.assertEqual(1, len(cluster.bad_hosts))
900-
time.sleep(1.1)
901-
self.assertEqual('Success', cluster.query(''))
902-
self.assertEqual(3, len(cluster.hosts))
903-
self.assertEqual(0, len(cluster.bad_hosts))
904-
905-
def test_dsn(self):
906-
cli = InfluxDBClusterClient.from_DSN(self.dsn_string)
907-
self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts)
908-
self.assertEqual('http://host1:8086', cli._client._baseurl)
909-
self.assertEqual('uSr', cli._client._username)
910-
self.assertEqual('pWd', cli._client._password)
911-
self.assertEqual('db', cli._client._database)
912-
self.assertFalse(cli._client.use_udp)
913-
914-
cli = InfluxDBClusterClient.from_DSN('udp+' + self.dsn_string)
915-
self.assertTrue(cli._client.use_udp)
916-
917-
cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string)
918-
self.assertEqual('https://host1:8086', cli._client._baseurl)
919-
920-
cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string,
921-
**{'ssl': False})
922-
self.assertEqual('http://host1:8086', cli._client._baseurl)
923-
924-
def test_dsn_password_caps(self):
925-
cli = InfluxDBClusterClient.from_DSN(
926-
'https+influxdb://usr:pWd@host:8086/db')
927-
self.assertEqual('pWd', cli._client._password)
928-
929-
def test_dsn_mixed_scheme_case(self):
930-
cli = InfluxDBClusterClient.from_DSN(
931-
'hTTps+inFLUxdb://usr:pWd@host:8086/db')
932-
self.assertEqual('pWd', cli._client._password)
933-
self.assertEqual('https://host:8086', cli._client._baseurl)
934-
935-
cli = InfluxDBClusterClient.from_DSN(
936-
'uDP+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db')
937-
self.assertTrue(cli._client.use_udp)

0 commit comments

Comments
 (0)