commit 3837b3b6e29063072aae818e3d23e40ca9c1499f Author: David Moreau Simard Date: Tue Sep 1 14:37:09 2020 -0400 callback: push non-blocking calls to threads There are some parts of the callback that need to be synchronous because they rely on an object that must be created before, for example. However, for the parts that don't need to run synchronously, thread them so we don't block Ansible from running. This results in a dramatic performance increase. Related: https://github.com/ansible-community/ara/issues/171 Change-Id: I4317d1d1175a5286704b04f57de9ef73fa911a3a diff --git a/ara/plugins/callback/ara_default.py b/ara/plugins/callback/ara_default.py index aa5b04f..af2fc05 100644 --- a/ara/plugins/callback/ara_default.py +++ b/ara/plugins/callback/ara_default.py @@ -21,6 +21,7 @@ import datetime import json import logging import os +from concurrent.futures import ThreadPoolExecutor from ansible import __version__ as ansible_version from ansible.parsing.ajson import AnsibleJSONEncoder @@ -175,6 +176,13 @@ class CallbackModule(CallbackBase): super(CallbackModule, self).__init__() self.log = logging.getLogger("ara.plugins.callback.default") self.client = None + # TODO: Consider un-hardcoding this and plumbing pool_maxsize to requests.adapters.HTTPAdapter. + # In the meantime default to 10 because that's the value of requests.adapters.DEFAULT_POOLSIZE. + # Otherwise we can hit "urllib3.connectionpool: Connection pool is full" + self.thread_count = 10 + self.global_threads = ThreadPoolExecutor(max_workers=self.thread_count) + # Need individual threads for tasks to ensure all results are saved before moving on to next task + self.task_threads = None self.ignored_facts = [] self.ignored_arguments = [] self.ignored_files = [] @@ -260,7 +268,7 @@ class CallbackModule(CallbackBase): ) # Record the playbook file - self._get_or_create_file(path, content) + self.global_threads.submit(self._get_or_create_file, path, content) return self.playbook @@ -290,7 +298,7 @@ class CallbackModule(CallbackBase): # Record all the files involved in the play for path in play._loader._FILE_CACHE.keys(): - self._get_or_create_file(path) + self.global_threads.submit(self._get_or_create_file, path) # Create the play self.play = self.client.post( @@ -312,6 +320,7 @@ class CallbackModule(CallbackBase): def v2_playbook_on_task_start(self, task, is_conditional, handler=False): self.log.debug("v2_playbook_on_task_start") self._end_task() + self.task_threads = ThreadPoolExecutor(max_workers=self.thread_count) pathspec = task.get_path() if pathspec: @@ -346,16 +355,16 @@ class CallbackModule(CallbackBase): self.result_started[host.get_name()] = datetime.datetime.now().isoformat() def v2_runner_on_ok(self, result, **kwargs): - self._load_result(result, "ok", **kwargs) + self.task_threads.submit(self._load_result, result, "ok", **kwargs) def v2_runner_on_unreachable(self, result, **kwargs): - self._load_result(result, "unreachable", **kwargs) + self.task_threads.submit(self._load_result, result, "unreachable", **kwargs) def v2_runner_on_failed(self, result, **kwargs): - self._load_result(result, "failed", **kwargs) + self.task_threads.submit(self._load_result, result, "failed", **kwargs) def v2_runner_on_skipped(self, result, **kwargs): - self._load_result(result, "skipped", **kwargs) + self.task_threads.submit(self._load_result, result, "skipped", **kwargs) def v2_playbook_on_stats(self, stats): self.log.debug("v2_playbook_on_stats") @@ -366,15 +375,24 @@ class CallbackModule(CallbackBase): def _end_task(self): if self.task is not None: - self.client.patch( - "/api/v1/tasks/%s" % self.task["id"], status="completed", ended=datetime.datetime.now().isoformat() + self.task_threads.submit( + self.client.patch, + "/api/v1/tasks/%s" % self.task["id"], + status="completed", + ended=datetime.datetime.now().isoformat(), ) + # Flush threads before moving on to next task to make sure all results are saved + self.task_threads.shutdown(wait=True) + self.task_threads = None self.task = None def _end_play(self): if self.play is not None: - self.client.patch( - "/api/v1/plays/%s" % self.play["id"], status="completed", ended=datetime.datetime.now().isoformat() + self.global_threads.submit( + self.client.patch, + "/api/v1/plays/%s" % self.play["id"], + status="completed", + ended=datetime.datetime.now().isoformat(), ) self.play = None @@ -385,9 +403,13 @@ class CallbackModule(CallbackBase): else: status = "completed" - self.playbook = self.client.patch( - "/api/v1/playbooks/%s" % self.playbook["id"], status=status, ended=datetime.datetime.now().isoformat() + self.global_threads.submit( + self.client.patch, + "/api/v1/playbooks/%s" % self.playbook["id"], + status=status, + ended=datetime.datetime.now().isoformat(), ) + self.global_threads.shutdown(wait=True) def _set_playbook_name(self, name): if self.playbook["name"] != name: @@ -482,7 +504,9 @@ class CallbackModule(CallbackBase): for hostname in hosts: host = self._get_or_create_host(hostname) host_stats = stats.summarize(hostname) - self.client.patch( + + self.global_threads.submit( + self.client.patch, "/api/v1/hosts/%s" % host["id"], changed=host_stats["changed"], unreachable=host_stats["unreachable"],