patroni.dcs.consul module

class patroni.dcs.consul.Consul(config: Dict[str, Any], mpp: patroni.postgresql.mpp.AbstractMPP)

Bases: patroni.dcs.AbstractDCS

__init__(config: Dict[str, Any], mpp: patroni.postgresql.mpp.AbstractMPP) → None

Prepare DCS paths, MPP object, initial values for state information and processing dependencies.

Variables:configdict, reference to config section of selected DCS. i.e.: zookeeper for zookeeper, etcd for etcd, etc…
_abc_impl = <_abc_data object>
_cluster_from_nodes(nodes: Dict[str, Any]) → patroni.dcs.Cluster
_consistency
_delete_leader(**kwargs) → Any
_do_attempt_to_acquire_leader(retry: patroni.utils.Retry) → bool
_do_refresh_session(force: bool = False) → bool
Returns:!True if it had to create new session
_load_cluster(path: str, loader: Callable[[str], Union[patroni.dcs.Cluster, Dict[int, patroni.dcs.Cluster]]]) → Union[patroni.dcs.Cluster, Dict[int, patroni.dcs.Cluster]]

Main abstract method that implements the loading of Cluster instance.

Note

Internally this method should call the loader method that will build Cluster object which represents current state and topology of the cluster in DCS. This method supposed to be called only by the get_cluster() method.

Parameters:
  • path – the path in DCS where to load Cluster(s) from.
  • loader – one of _postgresql_cluster_loader() or _mpp_cluster_loader().
Raise:

DCSError in case of communication problems with DCS. If the current node was running as a primary and exception raised, instance would be demoted.

_mpp_cluster_loader(path: str) → Dict[int, patroni.dcs.Cluster]

Load and build all PostgreSQL clusters from a single MPP cluster.

Parameters:path – the path in DCS where to load Cluster(s) from.
Returns:all MPP groups as dict, with group IDs as keys and Cluster objects as values.
_postgresql_cluster_loader(path: str) → patroni.dcs.Cluster

Load and build the Cluster object from DCS, which represents a single PostgreSQL cluster.

Parameters:path – the path in DCS where to load Cluster from.
Returns:Cluster instance.
static _run_and_handle_exceptions(method: Callable[[...], Any], *args, **kwargs) → Any
_set_service_name() → None
_update_leader(**kwargs)
_update_service(data: Dict[str, Any]) → Optional[bool]
_write_failsafe(**kwargs) → Any
_write_leader_optime(**kwargs) → Any
_write_status(**kwargs) → Any
adjust_ttl() → None
attempt_to_acquire_leader(**kwargs)
cancel_initialization(**kwargs) → Any
create_session() → None
delete_cluster(**kwargs) → Any
delete_sync_state(**kwargs) → Any
deregister_service(**kwargs) → Any
initialize(**kwargs) → Any
static member(node: Dict[str, str]) → patroni.dcs.Member
refresh_session() → bool
register_service(**kwargs) → Any
reload_config(config: Union[Config, Dict[str, Any]]) → None

Load and set relevant values from configuration.

Sets loop_wait, ttl and retry_timeout properties.

Parameters:config – Loaded configuration information object or dictionary of key value pairs.
retry(method: Callable[[...], Any], *args, **kwargs) → Any
set_config_value(**kwargs) → Any
set_failover_value(**kwargs) → Any
set_history_value(**kwargs) → Any
set_retry_timeout(retry_timeout: int) → None

Set the new value for retry_timeout.

set_sync_state_value(**kwargs) → Any
set_ttl(ttl: int) → Optional[bool]

Set the new ttl value for DCS keys.

take_leader() → bool

Establish a new leader in DCS.

Note

This method should create leader key with value of _name and ttl of ttl.

Since it could be called only on initial cluster bootstrap it could create this key regardless, overwriting the key if necessary.

Returns:True if successfully committed to DCS.
touch_member(**kwargs) → Any
ttl

Get current ttl value.

update_service(**kwargs) → Any
watch(leader_version: Optional[int], timeout: float) → bool

Sleep if the current node is a leader, otherwise, watch for changes of leader key with a given timeout.

Parameters:
  • leader_version – version of a leader key.
  • timeout – timeout in seconds.
Returns:

if True this will reschedule the next run of the HA cycle.

class patroni.dcs.consul.ConsulClient(*args, **kwargs)

Bases: consul.base.Consul

__init__(*args, **kwargs) → None

Consul client with Patroni customisations.

Note

Parameters, token, cert and ca_cert are not passed to the parent class consul.base.Consul.

Original class documentation,

token is an optional ACL token. If supplied it will be used by default for all requests made with this client session. It’s still possible to override this token by passing a token explicitly for a request.

consistency sets the consistency mode to use by default for all reads that support the consistency option. It’s still possible to override this by passing explicitly for a given request. consistency can be either ‘default’, ‘consistent’ or ‘stale’.

dc is the datacenter that this agent will communicate with. By default, the datacenter of the host is used.

verify is whether to verify the SSL certificate for HTTPS requests

cert client side certificates for HTTPS requests

Parameters:
  • args – positional arguments to pass to consul.base.Consul
  • kwargs – keyword arguments, with cert, ca_cert and token removed, passed to consul.base.Consul
connect(*args, **kwargs) → patroni.dcs.consul.HTTPClient
http_connect(*args, **kwargs) → patroni.dcs.consul.HTTPClient
reload_config(config: Dict[str, Any]) → None
exception patroni.dcs.consul.ConsulError(value: Any)

Bases: patroni.exceptions.DCSError

exception patroni.dcs.consul.ConsulInternalError

Bases: consul.base.ConsulException

An internal Consul server error occurred

class patroni.dcs.consul.HTTPClient(host: str = '127.0.0.1', port: int = 8500, token: Optional[str] = None, scheme: str = 'http', verify: bool = True, cert: Optional[str] = None, ca_cert: Optional[str] = None)

Bases: object

__init__(host: str = '127.0.0.1', port: int = 8500, token: Optional[str] = None, scheme: str = 'http', verify: bool = True, cert: Optional[str] = None, ca_cert: Optional[str] = None) → None

Initialize self. See help(type(self)) for accurate signature.

static response(response: urllib3.response.HTTPResponse) → patroni.dcs.consul.Response
set_read_timeout(timeout: float) → None
set_ttl(ttl: int) → bool
ttl
uri(path: str, params: Union[None, Dict[str, Any], List[Tuple[str, Any]], Tuple[Tuple[str, Any], ...]] = None) → str
exception patroni.dcs.consul.InvalidSession

Bases: consul.base.ConsulException

invalid session

exception patroni.dcs.consul.InvalidSessionTTL

Bases: consul.base.ConsulException

Session TTL is too small or too big

class patroni.dcs.consul.Response(code, headers, body, content)

Bases: tuple

_asdict()

Return a new dict which maps field names to their values.

_field_defaults = {}
_field_types = {'body': <class 'str'>, 'code': <class 'int'>, 'content': <class 'bytes'>, 'headers': typing.Union[typing.Mapping[str, str], typing.Mapping[bytes, bytes], NoneType]}
_fields = ('code', 'headers', 'body', 'content')
_fields_defaults = {}
classmethod _make(iterable)

Make a new Response object from a sequence or iterable

_replace(**kwds)

Return a new Response object replacing specified fields with new values

body

Alias for field number 2

code

Alias for field number 0

content

Alias for field number 3

headers

Alias for field number 1

patroni.dcs.consul.catch_consul_errors(func: Callable[[...], Any]) → Callable[[...], Any]
patroni.dcs.consul.force_if_last_failed(func: Callable[[...], Any]) → Callable[[...], Any]
patroni.dcs.consul.service_name_from_scope_name(scope_name: str) → str

Translate scope name to service name which can be used in dns.

230 = 253 - len(‘replica.’) - len(‘.service.consul’)