Source code for validations_libs.cli.common
#!/usr/bin/env python
#   Copyright 2021 Red Hat, Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License"); you may
#   not use this file except in compliance with the License. You may obtain
#   a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.
from argparse import ArgumentDefaultsHelpFormatter
from cliff import _argparse
import json
import logging
from prettytable import PrettyTable
import re
import sys
import time
import threading
import yaml
try:
    from junit_xml import TestSuite, TestCase, to_xml_report_string
    JUNIT_XML_FOUND = True
except ImportError:
    JUNIT_XML_FOUND = False
from validations_libs.cli import colors
from validations_libs import utils
# Handle backward compatibility for Cliff 2.16.0 in stable/train:
if hasattr(_argparse, 'SmartHelpFormatter'):
    from cliff._argparse import SmartHelpFormatter
else:
    from cliff.command import _SmartHelpFormatter as SmartHelpFormatter
[docs]def print_dict(data):
    """Print table from python dict with PrettyTable"""
    table = PrettyTable(border=True, header=True, padding_width=1)
    # Set Field name by getting the result dict keys
    try:
        table.field_names = data[0].keys()
        table.align = 'l'
    except IndexError:
        raise IndexError()
    for row in data:
        if row.get('Status_by_Host'):
            hosts = []
            for host in row['Status_by_Host'].split(', '):
                try:
                    _name, _status = host.split(',')
                except ValueError:
                    # if ValueError, then host is in unknown state:
                    _name = host
                    _status = 'UNKNOWN'
                _name = colors.color_output(_name, status=_status)
                hosts.append(_name)
            row['Status_by_Host'] = ', '.join(hosts)
        if row.get('Status'):
            status = row.get('Status')
            row['Status'] = colors.color_output(status, status=status)
        table.add_row(row.values())
    print(table) 
[docs]def write_output(output_log, results):
    """Write output log file as Json format"""
    with open(output_log, 'w') as output:
        output.write(json.dumps({'results': results}, indent=4,
                                sort_keys=True)) 
[docs]def write_junitxml(output_junitxml, results):
    """Write output file as JUnitXML format"""
    if not JUNIT_XML_FOUND:
        log = logging.getLogger(__name__ + ".write_junitxml")
        log.warning('junitxml output disabled: the `junit_xml` python module '
                    'is missing.')
        return
    test_cases = []
    duration_re = re.compile('([0-9]+):([0-9]+):([0-9]+).([0-9]+)')
    for vitem in results:
        if vitem.get('Validations'):
            parsed_duration = 0
            test_duration = vitem.get('Duration', '')
            matched_duration = duration_re.match(test_duration)
            if matched_duration:
                parsed_duration = (int(matched_duration[1])*3600
                                   + int(matched_duration[2])*60
                                   + int(matched_duration[3])
                                   + float('0.{}'.format(matched_duration[4])))
            test_stdout = vitem.get('Status_by_Host', '')
            test_case = TestCase('validations', vitem['Validations'],
                                 parsed_duration, test_stdout)
            if vitem['Status'] == 'FAILED':
                test_case.add_failure_info('FAILED')
            test_cases.append(test_case)
    ts = TestSuite("Validations", test_cases)
    with open(output_junitxml, 'w') as output:
        output.write(to_xml_report_string([ts])) 
[docs]def read_cli_data_file(data_file):
    """Read CLI data (YAML/JSON) file.
    :param data_file: Path to the requested file.
    :type data_file: ``path like``
    :returns: Parsed YAML/JSON file
    :rtype: ``dict``
    :raises: RuntimeError if the file doesn't exist or is malformed.
    """
    try:
        with open(data_file, 'r') as _file:
            return yaml.safe_load(_file.read())
    except (yaml.YAMLError, IOError) as error:
        error_msg = (
            "The file {} must be properly formatted YAML/JSON."
            "Details: {}.").format(data_file, error)
        raise RuntimeError(error_msg) 
[docs]class Spinner(object):
    """Animated spinner to indicate activity during processing"""
    busy = False
    delay = 0.1
[docs]    @staticmethod
    def spinning_cursor():
        while 1:
            for cursor in '|/-\\':
                yield cursor 
    def __init__(self, delay=None):
        self.spinner_generator = self.spinning_cursor()
        if delay and float(delay):
            self.delay = delay
[docs]    def spinner_task(self):
        while self.busy:
            sys.stdout.write(next(self.spinner_generator))
            sys.stdout.flush()
            time.sleep(self.delay)
            sys.stdout.write('\b')
            sys.stdout.flush() 
    def __enter__(self):
        self.busy = True
        threading.Thread(target=self.spinner_task).start()
    def __exit__(self, exception, value, tb):
        self.busy = False
        time.sleep(self.delay)
        if exception is not None:
            return False