Source code for kafka_connect_watcher.cluster
# SPDX-License-Identifier: MPL-2.0
# Copyright 2023 John "Preston" Mille <john@ews-network.net>
"""
Models Connect cluster
"""
from __future__ import annotations
from copy import deepcopy
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from kafka_connect_watcher.config import Config
from aws_embedded_metrics.config import get_config
from compose_x_common.compose_x_common import keyisset, set_else_none
from kafka_connect_api.kafka_connect_api import Api, Cluster, Connector
from prometheus_client import Gauge
from kafka_connect_watcher.config import EmfConfig
from kafka_connect_watcher.error_rules import EvaluationRule
emf_config = get_config()
[docs]class ConnectCluster:
"""
The ConnectCluster manages connection and scan of the connectors status execution.
It also collects metrics about itself.
"""
def __init__(self, cluster_config: dict, watcher_config: Config):
if not isinstance(cluster_config, dict):
raise TypeError("cluster_config must be a dict. Got", type(cluster_config))
self.definition: dict = cluster_config
self._orignial_definiton: dict = deepcopy(cluster_config)
self._name = set_else_none("name", cluster_config)
self._port = int(set_else_none("port", cluster_config, 8083))
auth = set_else_none("authentication", cluster_config)
url = set_else_none("url", cluster_config)
username = set_else_none(
"username", set_else_none("authentication", cluster_config)
)
password = set_else_none(
"password", set_else_none("authentication", cluster_config)
)
if url:
self._api = Api(
self.hostname,
url=url,
username=username,
password=password,
)
else:
self._api = Api(
self.hostname,
port=int(set_else_none("port", cluster_config, 8083)),
username=username,
password=password,
)
try:
self._cluster = Cluster(self.api)
except Exception as error:
print(error)
self.handling_rules: list[EvaluationRule] = [
EvaluationRule(config, watcher_config)
for config in set_else_none(EvaluationRule.config_key, self.definition)
]
self.metrics_config: dict = set_else_none("metrics", self.definition, {})
# self.emf_config: dict = set_else_none("aws_emf", self.metrics_config, {})
self.emf_config: EmfConfig = (
EmfConfig(self.metrics_config["aws_emf"])
if keyisset("aws_emf", self.metrics_config)
else None
)
self.prometheus_config: dict = set_else_none(
"prometheus", self.metrics_config, {}
)
self.emf_namespace = None
self.metrics: dict = {"connectors": {}}
@property
def hostname(self) -> str:
return self.definition["hostname"]
@property
def api(self) -> Api:
return self._api
@property
def cluster(self) -> Cluster:
return self._cluster
@property
def name(self) -> str:
if self._name:
return self._name
return f"{self.hostname}_{self.port}"
@property
def port(self) -> int:
return self._port
[docs] def emf_high_resolution(self) -> bool:
return keyisset("high_resolution_metrics", self.emf_config)