from __future__ import annotations
import logging
import os
import time
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from dkutils.constants import API_GET
from dkutils.datakitchen_api.datakitchen_client import DataKitchenClient
from dkutils.decorators import retry_50X_httperror
from events_ingestion_client import (
ApiClient,
Configuration,
EventsApi,
MessageLogEventApiSchema,
RunStatusApiSchema,
TestOutcomeItem,
TestOutcomesApiSchema,
)
from events_ingestion_client.rest import ApiException
logger = logging.getLogger(__name__)
DEFAULT_HOST = 'https://dev-api.datakitchen.io'
NODE_UNKNOWN = 'DKNodeStatus_Unknown'
NODE_TEMPLATE = 'DKNodeStatus_Template'
NODE_IN_CONFIGURATION = 'DKNodeStatus_in_configuration'
NODE_FULLY_CONFIGURED = 'DKNodeStatus_fully_configured'
NODE_NOT_RUN = 'DKNodeStatus_ready_for_production'
NODE_RUNNING = 'DKNodeStatus_in_production'
NODE_SUCCESSFULL = 'DKNodeStatus_completed_production'
NODE_FAILED = 'DKNodeStatus_production_error'
NODE_STOPPED = 'DKNodeStatus_production_stopped'
NODE_SKIPPED = 'DKNodeStatus_Skipped'
LOG_LEVELS_TO_REPORT = ['WARNING', 'ERROR', 'CRITICAL']
LOG_METADATA_KEYS_TO_REPORT = ['exc_desc', 'exc_type', 'traceback']
ALLOWED_TEST_STATUS_TYPES = ['PASSED', 'FAILED', 'WARNING']
[docs]@retry_50X_httperror()
def get_customer_code(dk_client: DataKitchenClient) -> str:
"""
Retrieve the customer code from the authenticated user associated with the provided
DataKitchen client.
Parameters
----------
dk_client: DataKitchenClient
Client for making requests to the DataKitchen platform API.
Returns
-------
str
Customer code - typically two or three letters.
"""
user_info = dk_client._api_request(API_GET, 'userinfo')
return user_info.json()['customer_git_name']
[docs]def get_order_run_url(dk_client: DataKitchenClient, customer_code: str, order_run_id: str) -> str:
"""
Retrieve the URL for navigating to the Order Run Details page in the DataKitchen platform for
the provided order_run_id.
Parameters
----------
dk_client: DataKitchenClient
Client for making requests to the DataKitchen platform API
customer_code: str
Customer code required for constructing the URL
order_run_id
Order run id the URL will link to
Returns
-------
str
URL for navigating to the Order Run Details page in the DataKitchen platform for the
provided order_run_id.
"""
base_url = dk_client._base_url
kitchen = dk_client.kitchen
return os.sep.join([base_url, '#', 'orders', customer_code, kitchen, 'runs', order_run_id])
[docs]@retry_50X_httperror()
def get_ingredient_owner_order_run_id(dk_client: DataKitchenClient):
"""
If this order run is for an ingredient, then return the parent order run id. Otherwise, return
None.
Parameters
----------
dk_client: DataKitchenClient
Client for making requests to the DataKitchen platform API.
Returns
-------
str or None
Return the parent order run id if the current order run is for an ingredient, otherwise
return None.
"""
try:
order_run_status = dk_client._api_request('get', 'order/status', dk_client.kitchen).json()
# If this order run is for an ingredient, then it's in an ingredient kitchen with a single
# order and order run and the status should contain an ingredient_owner_order_run field.
return order_run_status['orders'][0]['input_settings']['ingredient_owner_order_run']
except KeyError:
return None
[docs]class RunStatus(Enum):
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
COMPLETED_WITH_WARNINGS = "COMPLETED_WITH_WARNINGS"
FAILED = "FAILED"
[docs]@dataclass
class EventInfoProvider:
dk_client: DataKitchenClient
customer_code: str
pipeline_key: str
order_run_id: str
run_name: str = None
[docs] @classmethod
def init(
cls, dk_client: DataKitchenClient, pipeline_key: str, order_run_id: str, run_name: str
) -> EventInfoProvider:
customer_code = get_customer_code(dk_client)
return EventInfoProvider(dk_client, customer_code, pipeline_key, order_run_id, run_name)
[docs] def get_event_info(self, **kwargs) -> dict:
event_info = {'pipeline_key': self.pipeline_key, 'run_key': self.order_run_id, **kwargs}
if self.run_name:
event_info['run_name'] = self.run_name
if 'event_timestamp' not in event_info:
event_info['event_timestamp'] = datetime.utcnow().isoformat()
if 'external_url' not in event_info:
event_info['external_url'] = get_order_run_url(
self.dk_client, self.customer_code, self.order_run_id
)
return event_info
[docs]@dataclass
class Node:
events_api_client: EventsApi
event_info_provider: EventInfoProvider
name: str
info: dict = None
started_event_published: bool = False
status: str = None
start_time: int = None
end_time: int = None
@property
def running(self) -> bool:
return self.status == NODE_RUNNING
@property
def succeeded(self) -> bool:
return self.status == NODE_SUCCESSFULL
@property
def stopped(self) -> bool:
return self.status == NODE_STOPPED
@property
def failed(self) -> bool:
return self.status == NODE_FAILED
[docs] def update(self, info: dict) -> None:
self.info = info
prev_status = self.status
self.status = self.info['status']
if prev_status != self.status:
self._update_timings()
self._handle_event()
return self
def _update_timings(self) -> None:
start_time = self.info['start_time']
timing = self.info['timing']
if start_time:
self.start_time = start_time
elif self.start_time is None:
self.start_time = int(time.time() * 1000)
if timing:
self.end_time = self.start_time + timing
else:
self.end_time = self.start_time
def _handle_event(self) -> None:
if self.running:
self._publish_run_status_event(RunStatus.RUNNING, self.start_time)
self.started_event_published = True
elif self.succeeded or self.stopped:
if not self.started_event_published:
self._publish_run_status_event(RunStatus.RUNNING, self.start_time)
self.started_event_published = True
self._publish_run_status_event(RunStatus.COMPLETED, self.end_time)
elif self.failed:
if not self.started_event_published:
self._publish_run_status_event(RunStatus.RUNNING, self.start_time)
self.started_event_published = True
self._publish_run_status_event(RunStatus.FAILED, self.end_time)
def _publish_run_status_event(self, run_status: str, milliseconds_from_epoch: int) -> None:
try:
event_timestamp = datetime.utcfromtimestamp(milliseconds_from_epoch / 1000).isoformat()
event_info = self.event_info_provider.get_event_info(
task_key=self.name, status=run_status.name, event_timestamp=event_timestamp
)
logger.info(f'Publishing event: {event_info}')
self.events_api_client.post_run_status(RunStatusApiSchema(**event_info))
except ApiException as e:
logger.error(f'Exception when calling EventsApi->post_run_status: {str(e)}\n')
raise
[docs] def publish_tests(self) -> None:
test_reports = self._get_test_reports()
if len(test_reports) == 0:
return
try:
event_info = self.event_info_provider.get_event_info(
task_key=self.name, test_outcomes=test_reports
)
self.events_api_client.post_test_outcomes(TestOutcomesApiSchema(**event_info))
except ApiException as e:
logger.error(f'Exception when calling EventsApi->post_test_result:: {str(e)}\n')
def _extract_tests(self, tests: dict) -> list:
test_reports = []
for name, test in tests.items():
status = test['status'].upper()
if status in ALLOWED_TEST_STATUS_TYPES:
test_reports.append(
TestOutcomeItem(description=test['results'], name=name, status=status)
)
return test_reports
def _get_test_reports(self) -> list:
test_reports = []
test_reports.extend(self._extract_tests(self.info['tests']))
def add_reports(key):
if key in self.info:
for value in self.info[key].values():
test_reports.extend(self._extract_tests(value['tests']))
[add_reports(key) for key in ['data_sources', 'data_sinks', 'actions']]
return test_reports
[docs]class OrderRunMonitor:
[docs] def __init__(
self,
dk_client: DataKitchenClient,
events_api_key: str,
pipeline_name: str,
order_run_id: str,
run_name: str = None,
nodes_to_ignore: list = None,
sleep_time_secs: int = 10,
host: str = DEFAULT_HOST,
):
"""
This class is for use in monitoring a DataKitchen Order Run and reporting its status to
DataKitchen's Observability module Events Ingestion API. It will report the start/stop of
each node as an individual task and close the run when finished.
Parameters
----------
dk_client : DataKitchenClient
events_api_key : str
Events Ingestion API key.
pipeline_name : str
Name of the pipeline being monitored
order_run_id : str
Id of the Order Run being monitored.
run_name : str, optional
Human readable name for the pipeline execution being monitored (default: None).
nodes_to_ignore : list or None, optional
List of nodes to ignore. If the monitor node is named Order_Run_Monitor, it is added to
the ignore list by default and there is no need to add it here (default: None).
sleep_time_secs : int, optional
Polling interval for monitoring the run in seconds (default: 10).
host : str, optional
URL of the Events Ingestion API.
"""
self._dk_client = dk_client
self.is_ingredient_order_run = False
ingredient_owner_order_run_id = get_ingredient_owner_order_run_id(dk_client)
if ingredient_owner_order_run_id is not None:
logger.info(f'Ingredient order run originated from {ingredient_owner_order_run_id}')
self.is_ingredient_order_run = True
self._event_info_provider = EventInfoProvider.init(
dk_client, pipeline_name, order_run_id, run_name
)
self._order_run_id = order_run_id
self._nodes_to_ignore = nodes_to_ignore if nodes_to_ignore is not None else []
self._nodes_to_ignore += ['Order_Run_Monitor']
self._nodes_to_ignore += self.get_conditional_nodes()
self._sleep_time_secs = sleep_time_secs
# Configure API key authorization: SAKey
configuration = Configuration()
configuration.api_key['ServiceAccountAuthenticationKey'] = events_api_key
configuration.host = host
# Create an instance of the API class
self._events_api_client = EventsApi(ApiClient(configuration))
[docs] @retry_50X_httperror()
def get_order_run_details(self, **kwargs) -> dict:
"""
Retrieve order run details for the associated order run. The provided kwargs may be used to
augment the returned value with more granular details.
Parameters
----------
kwargs
Optional keyword arguments as found in DataKitchenClient's
:func:`~dkutils.datakitchen_api.datakitchen_client.DataKitchenClient.get_order_run_details`
Returns
-------
dict
Dictionary of order run details
"""
return self._dk_client.get_order_run_details(self._order_run_id, **kwargs)
[docs] def get_conditional_nodes(self) -> list:
"""
Retrieve a list of the conditional node names present in this Order Run.
Returns
-------
list
List of conditional node names.
"""
order_run_details = self.get_order_run_details()
return list(set([v['node'] for v in order_run_details.get('conditions', {}).values()]))
[docs] def get_nodes_info(self) -> dict:
"""
Extract and return the node information from the order run details, excluding the nodes that
should be ignored (e.g. conditional nodes and the Order_Run_Monitor node itself).
Returns
-------
dict
Dictionary keyed by node name and valued by a dictionary of node details.
"""
nodes_info = self.get_order_run_details(include_summary=True)['summary']['nodes']
[nodes_info.pop(node_name, None) for node_name in self._nodes_to_ignore]
return nodes_info
def _create_node(self, name: str, info: dict) -> Node:
"""
Create a Node object and initialize it.
Parameters
----------
name : str
Node name
info : dict
Milliseconds from epoch when the node started executing
Returns
-------
Node
"""
return Node(self._events_api_client, self._event_info_provider, name).update(info)
[docs] @staticmethod
def parse_log_entry(log_entry: dict) -> dict:
"""
Parse a log entry dictionary to derive the fields required for the
MessageLogEventApiSchema.
Parameters
----------
log_entry : dict
Dictionary of log details for a single log entry of the form::
{
'datetime': '2022-08-16T14:38:58.611000',
'disk_used': '5.8984375 MB',
'exc_desc': None,
'exc_type': None,
'hostname': '0d4e31d0-1d9b-11ed-971d-621c363ef06a-lqq9t',
'mem_usage': '127.33 MB',
'message': 'Test Fail: DKDataTestFailed',
'node': 'Fail_Node',
'order_run_id': '115d3e42-1d9b-11ed-b495-c216e4cc8e61',
'pid': 22,
'priority': 27,
'record_type': 'ERROR',
'syslogts': '2022-08-16T14:38:58-05:00',
'thread_name': 'NodeExecutorThread:0',
'traceback': None
}
Returns
-------
dict
Dictionary of required and optional fields for the MessageLogEventApiSchema
"""
# Permitted log levels: "ERROR", "WARNING", and "INFO"
log_level = log_entry['record_type'] if log_entry['record_type'] != 'CRITICAL' else 'ERROR'
metadata = {}
def add_metadata(key):
if key in log_entry and log_entry[key] is not None:
metadata[key] = log_entry[key]
[add_metadata(k) for k in LOG_METADATA_KEYS_TO_REPORT]
return {
'event_timestamp': log_entry['syslogts'],
'log_level': log_level,
'message': log_entry['message'],
'metadata': metadata,
'task_key': log_entry['node']
}
[docs] def process_log_entries(self) -> None:
"""
Send MessageLog events for WARNING and ERROR log messages.
"""
try:
for log_entry in self.get_order_run_details(include_logs=True)['log']['lines']:
if log_entry['record_type'] in LOG_LEVELS_TO_REPORT and log_entry[
'node'] not in self._nodes_to_ignore:
try:
event_info = self._event_info_provider.get_event_info(
**self.parse_log_entry(log_entry)
)
body = MessageLogEventApiSchema(**event_info)
self._events_api_client.post_message_log(body)
except ApiException as e:
logger.error(
f'Exception when calling EventsApi->post_message_log: {str(e)}'
)
except Exception as e:
logger.error(f'Failed to process logs: {str(e)}')
[docs] def monitor(self) -> tuple:
"""
Poll the DataKitchen platform API for the status of the associated Order Run. Report the
status of each node until all the nodes have completed or if the run has failed and nodes
stopped processing. If this order run is for an ingredient, monitoring is disabled.
Returns
-------
tuple
Contains two lists. The first list contains names of the nodes that succeeded, whereas
the second list contains names of the nodes that failed. Both are empty if this is an
ingredient order run.
"""
if self.is_ingredient_order_run:
logger.info('This is an ingredient order run - disabling monitoring.')
return [], []
nodes = []
failed_nodes = []
try:
nodes_are_running = True
nodes_info = self.get_nodes_info()
nodes = [self._create_node(name, info) for name, info in nodes_info.items()]
while nodes_are_running:
# Update all the nodes with the latest run info
[node.update(nodes_info[node.name]) for node in nodes]
nodes_are_running = any([node.running for node in nodes])
if nodes_are_running:
time.sleep(self._sleep_time_secs)
nodes_info = self.get_nodes_info()
logger.info('Order run finished. Shutting Down...')
successful_nodes = [node.name for node in nodes if node.succeeded]
failed_nodes = [node.name for node in nodes if node.failed]
finally:
self.process_log_entries()
[node.publish_tests() for node in nodes]
run_status = RunStatus.COMPLETED if len(failed_nodes) == 0 else RunStatus.FAILED
self._events_api_client.post_run_status(
RunStatusApiSchema(
status=run_status.name, **self._event_info_provider.get_event_info()
)
)
return successful_nodes, failed_nodes