Source code for kafka_connect_watcher.aws_emf

#   SPDX-License-Identifier: MPL-2.0
#   Copyright 2023 John "Preston" Mille <john@ews-network.net>

"""
AWS EMF Publishing management for cluster & connectors
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from kafka_connect_watcher.cluster import ConnectCluster
    from kafka_connect_watcher.config import Config
    from kafka_connect_watcher.watcher import Watcher

from asyncio import get_event_loop, new_event_loop, set_event_loop
from copy import deepcopy

from aws_embedded_metrics import metric_scope
from aws_embedded_metrics.config import get_config

from kafka_connect_watcher.logger import LOG

emf_config = get_config()


[docs]def init_emf_config(config: Config) -> None: try: loop = get_event_loop() except RuntimeError: loop = new_event_loop() set_event_loop(loop) emf_config.service_name = config.emf_service_name emf_config.service_type = config.emf_service_type emf_config.log_group_name = config.emf_log_group
[docs]@metric_scope def publish_cluster_metrics(cluster: ConnectCluster, metrics) -> None: LOG.info( f"{cluster.name} - Publishing Cluster metrics to EMF with Resolution {cluster.emf_config.emf_resolution}", ) LOG.debug(cluster.metrics) metrics.reset_dimensions(use_default=False) metrics.set_property("ConnectDetails", {"designation": cluster.name}) dimensions: dict = deepcopy(cluster.emf_config.dimensions) dimensions.update({"ConnectCluster": cluster.name}) metrics.put_dimensions(dimensions) for metric_name, value in cluster.metrics.items(): if not isinstance(value, (int, str)): continue metrics.put_metric(metric_name, value, None, cluster.emf_config.emf_resolution)
[docs]@metric_scope def publish_connector_metrics( cluster: ConnectCluster, connector_name, connector_metrics, metrics ) -> None: LOG.debug( f"Publishing Cluster Connector metrics to EMF with Resolution {cluster.emf_config.emf_resolution}" ) metrics.set_namespace(cluster.emf_config.namespace) metrics.reset_dimensions(use_default=False) metrics.set_property("ConnectDetails", {"designation": cluster.name}) dimensions: dict = deepcopy(cluster.emf_config.dimensions) dimensions.update({"ConnectorName": connector_name, "ConnectCluster": cluster.name}) metrics.put_dimensions(dimensions) for _conn_metric_name, _conn_metric_value in connector_metrics.items(): metrics.put_metric( _conn_metric_name, _conn_metric_value, None, cluster.emf_config.emf_resolution, )
[docs]def publish_clusters_emf(cluster: ConnectCluster) -> None: if not cluster.emf_config.enabled: return try: loop = get_event_loop() except RuntimeError: loop = new_event_loop() print("Publish EMF for clusters") set_event_loop(loop) publish_cluster_metrics(cluster) for connector_name, connector_metrics in cluster.metrics["connectors"].items(): publish_connector_metrics(cluster, connector_name, connector_metrics)
[docs]@metric_scope def publish_watcher_emf_metrics(config: Config, watcher: Watcher, metrics): LOG.info( f"Publishing Watcher metrics to EMF with Resolution {config.emf_watcher_config.emf_resolution}" ) LOG.debug(watcher.metrics) metrics.set_namespace(config.emf_watcher_config.namespace) metrics.reset_dimensions(use_default=False) metrics.put_dimensions(config.emf_watcher_config.dimensions) for _watcher_metric, _watcher_metric_value in watcher.metrics.items(): metrics.put_metric( _watcher_metric, _watcher_metric_value, None, config.emf_watcher_config.emf_resolution, )
[docs]def handle_watcher_emf(config: Config, watcher: Watcher) -> None: if not (config.emf_watcher_config and config.emf_watcher_config.enabled): LOG.debug("Watcher metrics to EMF disabled") return publish_watcher_emf_metrics(config, watcher)