#!/usr/bin/env python3 '''functions used by multiple parts of the plugin''' # -*- encoding: utf-8; py-indent-offset: 4 -*- # (c) Michael Honkoop # License: GNU General Public License v2 import re import pytz import datetime from dateutil import tz from cmk.agent_based.v2 import ( DiscoveryResult, Service, ) ignored_items = [ "IDM jvm_stats", ] uptime_attributes = [ "Status_eDirectoryUpTime", ] time_attributes = [ "CheckPointThreadData_CheckPointThreadStartTime", "Status_eDirectorySystemCurrTime", ] time_attributes_ignored = [ "BackGroundProcInterval", "CheckPointThreadData_CheckPointThreadForceStartTime", "CheckPointThreadData_CheckPointThreadStartTime", ] non_graphable_attributes = [ "Status_eDirectorySystemCurrTime", "Status_eDirectoryUpTime", "Status_eDirectoryAgentVersion", "CheckPointThreadData_CheckPointThreadStartTime", "CheckPointThreadData_CheckPointThreadForceStartTime", "CheckPointThreadData_CheckPointThreadIsForced", "Size_CurrentTransactionID", "EngineVersion_EngineVersion", "driverSet_Stats_driverSetDN", "Driver_startOption", "Driver_type", "Driver_uptime", "Driver_driver-state", "Driver_DriverDN", "Finalizer_lock", "Finalizer_state", "Dispatcher_state", "DirXML_state", "Scheduler_lock", "Scheduler_state", "Scheduler_Id", "Handler_lock", "Handler_state", "IDM job_stats", "system_properties", ] idm_nongraphable_attributes = [ "arch", "average_load", "comitted", "configuration", "containment", "downTime", "DriverDN", "driver-state", "id", "initial", "JobDN", "lock", "name", "processors", "startOption", "state", "status", "total", "type", "uptime", "upTime", "used", "version", ] total_counter_attributes = [ "Bindings_simpleAuthBinds", "Bindings_unAuthBinds", "Bindings_bindSecurityErrors", "Bindings_strongAuthBinds", "CacheFaultLooks_BlockCache", "CacheFaultLooks_EntryCache", "CacheFaultLooks_Total", "Errors_errors", "Errors_securityErrors", "Hits_BlockCache", "Hits_EntryCache", "Hits_Total", "HitLooks_BlockCache", "HitLooks_EntryCache", "HitLooks_Total", "IncomingOperations_abandonOps", "IncomingOperations_addEntryOps", "IncomingOperations_compareOps", "IncomingOperations_extendedOps", "IncomingOperations_inOps", "IncomingOperations_listOps", "IncomingOperations_wholeSubtreeSearchOps", "IncomingOperations_oneLevelSearchOps", "IncomingOperations_searchOps", "IncomingOperations_modifyRDNOps", "IncomingOperations_modifyEntryOps", "IncomingOperations_removeEntryOps", "IncomingOperations_readOps", "OutgoingOperations_chainings", "OutBoundContext_ActiveOutBoundContextCount", "OutBoundContext_TotalOutBoundContextCount", "Thread_Stats_Total_Shared_Count", "TrafficVolume_inBytes", "TrafficVolume_outBytes", ] def print_sections(raw_result): separator = 124 # '|'; lines = [] def process_item(item): """Process an individual item in the raw result.""" if isinstance(item, list): for sub_item in item: process_item(sub_item) elif isinstance(item, dict): for key, value in item.items(): if isinstance(value, list): value_str = chr(separator).join(value) # Preserve list information by joining elements else: value_str = value lines.append(f"{key}={value_str}") else: if "BackGroundProcInterval" not in str(item): # Skip irrelevant sections lines.append(str(item)) for each_item in raw_result: if isinstance(each_item, list): print_sections(each_item) # Recursive call for nested lists elif isinstance(each_item, tuple) and len(each_item) == 2: dn, attributes = each_item if "BackGroundProcInterval" in dn: continue # Skip irrelevant DNs lines.append(clean_key(dn)) for key, value in attributes.items(): if key == "objectclass": continue # Skip objectclass attribute if isinstance(value, list): # Handle list values for item in value: try: decoded_value = item.decode("utf-8") if len(decoded_value) != 0: lines.append(f"{key}={clean_value(decoded_value)}") except AttributeError: if len(item) != 0: lines.append(f"{key}={clean_value(item)}") else: # Handle scalar values try: decoded_value = value.decode("utf-8") if len(decoded_value) != 0: lines.append(f"{key}={clean_value(decoded_value)}") except AttributeError: if len(item) != 0: lines.append(f"{key}={clean_value(value)}") else: process_item(each_item) if lines: print(chr(separator).join(lines)) def clean_value(value): """Clean and format the values by removing unwanted parts.""" cleaned_value = re.sub(' Bytes| KB| MB| ms', '', value) return cleaned_value def clean_key(key): """Clean and format the key by removing unwanted parts.""" cleaned_key = ' '.join(key.split(',')[::-1]).replace('cn=Monitor', '').replace('cn=', '').strip() return cleaned_key def format_partition_agent(value): """strip unwanted data from Agent Partition data""" formatted = re.sub('CN=|OU=|O=|T=', '', value).replace("=", " ").replace(" ", "").split("#") return formatted def format_partition_data(value): """strip unwanted data from Agent Partition data""" formatted = (re.sub('CN=|OU=|O=', '', value).replace(',', '.')).split("=") return formatted def convert_timestamp(value): """convert Zulu time to current time in local timezone""" utc_dt = datetime.datetime.strptime(str(value), "%Y%m%d%H%M%SZ") utc = pytz.utc utc_dt = utc_dt.replace(tzinfo=tz.UTC) local_tz = tz.tzlocal() local_dt = utc_dt.astimezone(local_tz) return local_dt def parse_ldap_data(string_table): """parse one lines of data to dictionary""" parsed = {} for line in string_table: item = line[0] parsed.setdefault(item, {}) for _count, data in enumerate(line[1:]): if item == "Agent Partition": key, value=format_partition_agent(data) else: key, value=format_partition_data(data) parsed[item].setdefault(key, value) return parsed def discover_edirectory_items(section) -> DiscoveryResult: '''discover one item per key''' for key, data in section.items(): '''Yield a Service, unless the key is empty''' if len(key) != 0: if not key.startswith("IDM jvm_stats"): print(key) yield Service(item=key)