Source code for kafka_connect_watcher.connectors_eval

#   SPDX-License-Identifier: MPL-2.0
#   Copyright 2023 John "Preston" Mille <john@ews-network.net>

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from multiprocessing import Queue

from multiprocessing import Queue

from kafka_connect_api.errors import GenericNotFound

from kafka_connect_watcher.logger import LOG


[docs]def evaluate_connector_status(queue: Queue) -> None: while True: if queue.empty(): break else: ( evaluation_rule, connect, connector, running_connectors, paused_connectors, unassigned_connectors, connectors_to_fix, ) = queue.get() if connect is None: break connector_metrics: dict = { "tasks": len(connector.tasks), "running": len( [_task for _task in connector.tasks if _task.state == "RUNNING"] ), "failed": len( [_task for _task in connector.tasks if _task.state == "FAILED"] ), "unassigned": len( [_task for _task in connector.tasks if _task.state == "UNASSIGNED"] ), } try: if connector.state in ["RUNNING"]: if all([task.is_running for task in connector.tasks]): running_connectors += 1 else: connectors_to_fix.append(connector) elif connector.state == "PAUSED": paused_connectors += 1 if evaluation_rule.ignore_paused: connector.cycle_connector() elif connector.state == "UNASSIGNED": unassigned_connectors += 1 if not evaluation_rule.ignore_unassigned: connector.cycle_connector() else: connectors_to_fix.append(connector) except GenericNotFound as error: LOG.debug( "Connector {} not found in connect cluster. {}".format( connector.name, error, ) ) LOG.error( "{} - {}: failed to retrieve status".format( connect.name, connector.name ) ) unassigned_connectors += 1 connectors_to_fix.append(connector) connect.metrics["connectors"].update({connector.name: connector_metrics}) queue.task_done()