Source code for kafka_connect_watcher.watcher
# SPDX-License-Identifier: MPL-2.0
# Copyright 2023 John "Preston" Mille <john@ews-network.net>
"""
Main entrypoint of the python watcher
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from kafka_connect_watcher.config import Config
import signal
import threading
from datetime import datetime as dt
from queue import Queue
from time import sleep
from kafka_connect_watcher.aws_emf import (
handle_watcher_emf,
init_emf_config,
publish_clusters_emf,
)
from kafka_connect_watcher.cluster import ConnectCluster
from kafka_connect_watcher.logger import LOG
from kafka_connect_watcher.threads_settings import NUM_THREADS
FOREVER = 42
[docs]class Watcher:
"""
The Watcher class is the entry point to the program cycling over the different connect cluster, collecting metrics,
handling exceptions and graceful shutdowns.
"""
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
self.keep_running: bool = True
self.connect_clusters_processing_queue = Queue()
self._threads: list[threading.Thread] = []
self.metrics: dict = {
"connect_clusters_total": 0,
"connect_clusters_healthy": 0,
"connect_clusters_unhealthy": 0,
}
[docs] def run(self, config: Config):
LOG.info("Initializing the watcher")
clusters: list[ConnectCluster] = [
ConnectCluster(cluster, config) for cluster in config.config["clusters"]
]
self.metrics.update({"connect_clusters_total": len(clusters)})
init_emf_config(config)
LOG.info("Watcher clusters initialized.")
for _ in range(NUM_THREADS):
_thread = threading.Thread(
target=process_cluster,
daemon=True,
args=(self.connect_clusters_processing_queue,),
)
_thread.start()
self._threads.append(_thread)
LOG.info(
"Watcher threads ({}) initialized. Processing clusters & evaluation rules.".format(
NUM_THREADS
)
)
try:
while self.keep_running:
now = dt.now()
LOG.info("Clusters processing started")
for connect_cluster in clusters:
self.connect_clusters_processing_queue.put(
[
self,
config,
connect_cluster,
],
False,
)
self.connect_clusters_processing_queue.join()
LOG.info(
"Clusters processing finished - {}s".format(
(dt.now() - now).total_seconds()
)
)
if config.emf_watcher_config:
handle_watcher_emf(config, self)
sleep(config.scan_intervals)
self.metrics.update(
{"connect_clusters_healthy": 0, "connect_clusters_unhealthy": 0}
)
LOG.debug("Watcher metrics: {}".format(self.metrics))
except KeyboardInterrupt:
self.keep_running = False
LOG.debug("\rExited due to Keyboard interrupt")
[docs] def exit_gracefully(self, pid, pelse):
print(pid, pelse)
self.keep_running = False
exit(0)
[docs]def process_error_rules(
handling_rule, connect_cluster: ConnectCluster, watcher: Watcher
):
try:
handling_rule.execute(connect_cluster)
watcher.metrics["connect_clusters_healthy"] += 1
except Exception as error:
watcher.metrics["connect_clusters_unhealthy"] += 1
LOG.exception(error)
LOG.error(f"Failed to process the cluster {connect_cluster.name}")
try:
if connect_cluster.emf_config:
publish_clusters_emf(connect_cluster)
except Exception as error:
LOG.exception(error)
LOG.error(f"Failed to export EMF metrics for cluster {connect_cluster.name}")
[docs]def process_cluster(queue: Queue):
while FOREVER:
if not queue.empty():
watcher, config, connect_cluster = queue.get()
if connect_cluster is None:
break
for handling_rule in connect_cluster.handling_rules:
process_error_rules(handling_rule, connect_cluster, watcher)
queue.task_done()