Skip to content
Snippets Groups Projects
Commit 0b5f93f4 authored by jurgenhaas's avatar jurgenhaas
Browse files

Add callback plugin fluentd

parent 9abe6e2c
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
import socket
import uuid
import logging
try:
from fluent import handler
HAS_FLUENT = True
except ImportError:
HAS_FLUENT = False
from ansible.plugins.callback import CallbackBase
class CallbackModule(CallbackBase):
"""
ansible fluentd callback plugin
ansible.cfg:
callback_plugins = <path_to_callback_plugins_folder>
callback_whitelist = fluentd
and put the plugin in <path_to_callback_plugins_folder>
fluentd config:
input {
tcp {
port => 24224
codec => json
}
}
Requires:
fluent-logger
This plugin makes use of the following environment variables:
FLUENTD_SERVER (optional): defaults to localhost
FLUENTD_PORT (optional): defaults to 24224
FLUENTD_TYPE (optional): defaults to ansible
"""
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'aggregate'
CALLBACK_NAME = 'fluentd'
CALLBACK_NEEDS_WHITELIST = True
def __init__(self):
super(CallbackModule, self).__init__()
if not HAS_FLUENT:
self.disabled = True
self._display.warning("The required fluent-logger is not installed. "
"pip install fluent-logger")
else:
logging.basicConfig(level=logging.DEBUG)
self.logger = logging.getLogger('fluent.test')
self.logger.setLevel(logging.DEBUG)
self.handler = handler.FluentHandler(
'app.ansible',
host=os.getenv('FLUENTD_SERVER', 'localhost'),
port=int(os.getenv('FLUENTD_PORT', 24224)),
buffer_overflow_handler=handler
)
self.logger.addHandler(self.handler)
self.hostname = socket.gethostname()
self.session = str(uuid.uuid1())
self.errors = 0
def v2_playbook_on_start(self, playbook):
self.playbook = playbook._file_name
data = {
'status': "OK",
'host': self.hostname,
'session': self.session,
'ansible_type': "start",
'ansible_playbook': self.playbook,
}
self.logger.info("START " + self.playbook, extra = data)
def v2_playbook_on_stats(self, stats):
summarize_stat = {}
for host in stats.processed.keys():
summarize_stat[host] = stats.summarize(host)
if self.errors == 0:
status = "OK"
else:
status = "FAILED"
data = {
'status': status,
'host': self.hostname,
'session': self.session,
'ansible_type': "finish",
'ansible_playbook': self.playbook,
'ansible_result': json.dumps(summarize_stat), # deprecated field
}
self.logger.info(json.dumps(summarize_stat), extra = data)
def v2_runner_on_ok(self, result, **kwargs):
task_name = str(result._task).replace('TASK: ','')
if task_name == 'setup':
data = {
'status': "OK",
'host': self.hostname,
'session': self.session,
'ansible_type': "setup",
'ansible_playbook': self.playbook,
'ansible_host': result._host.name,
'ansible_task': task_name,
'ansible_facts': self._dump_results(result._result) # deprecated field
}
else:
if 'changed' in result._result.keys():
changed = result._result['changed']
else:
changed = False
data = {
'status': "OK",
'host': self.hostname,
'session': self.session,
'ansible_changed': changed,
'ansible_type': "task",
'ansible_playbook': self.playbook,
'ansible_host': result._host.name,
'ansible_task': task_name,
'ansible_result': self._dump_results(result._result) # deprecated field
}
self.logger.info(self._dump_results(result._result), extra = data)
def v2_runner_on_skipped(self, result, **kwargs):
task_name = str(result._task).replace('TASK: ','')
data = {
'status': "SKIPPED",
'host': self.hostname,
'session': self.session,
'ansible_type': "task",
'ansible_playbook': self.playbook,
'ansible_task': task_name,
'ansible_host': result._host.name
}
self.logger.info("SKIPPED " + task_name, extra = data)
def v2_playbook_on_import_for_host(self, result, imported_file):
data = {
'status': "IMPORTED",
'host': self.hostname,
'session': self.session,
'ansible_type': "import",
'ansible_playbook': self.playbook,
'ansible_host': result._host.name,
'imported_file': imported_file
}
self.logger.info("IMPORT " + imported_file, extra = data)
def v2_playbook_on_not_import_for_host(self, result, missing_file):
data = {
'status': "NOT IMPORTED",
'host': self.hostname,
'session': self.session,
'ansible_type': "import",
'ansible_playbook': self.playbook,
'ansible_host': result._host.name,
'missing_file': missing_file
}
self.logger.info("NOT IMPORTED " + missing_file, extra = data)
def v2_runner_on_failed(self, result, **kwargs):
task_name = str(result._task).replace('TASK: ','')
if 'changed' in result._result.keys():
changed = result._result['changed']
else:
changed = False
data = {
'status': "FAILED",
'host': self.hostname,
'session': self.session,
'ansible_changed': changed,
'ansible_type': "task",
'ansible_playbook': self.playbook,
'ansible_host': result._host.name,
'ansible_task': task_name,
'ansible_result': self._dump_results(result._result) # deprecated field
}
self.errors += 1
self.logger.error(self._dump_results(result._result), extra = data)
def v2_runner_on_unreachable(self, result, **kwargs):
task_name = str(result._task).replace('TASK: ','')
data = {
'status': "UNREACHABLE",
'host': self.hostname,
'session': self.session,
'ansible_type': "task",
'ansible_playbook': self.playbook,
'ansible_host': result._host.name,
'ansible_task': task_name,
'ansible_result': self._dump_results(result._result) # deprecated field
}
self.logger.error(self._dump_results(result._result), extra = data)
def v2_runner_on_async_failed(self, result, **kwargs):
task_name = str(result._task).replace('TASK: ','')
data = {
'status': "FAILED",
'host': self.hostname,
'session': self.session,
'ansible_type': "task",
'ansible_playbook': self.playbook,
'ansible_host': result._host.name,
'ansible_task': task_name,
'ansible_result': self._dump_results(result._result) # deprecated field
}
self.errors += 1
self.logger.error(self._dump_results(result._result), extra = data)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment