# -*- coding: utf-8 -*- import time import threading import re import urllib2 import os # based on https://github.com/csakatoku/ganglia-pymodule_nginx_status/ # Worker thread object _WorkerThread = None # Synchronization lock _glock = threading.Lock() # Refresh rate of the stub_status data _refresh_rate = 20 # Global dictionary storing the counts of the last state _conns = { 'nginx_active_connections': 0, 'nginx_server_accepts' : 0, 'nginx_server_handled' : 0, 'nginx_server_requests' : 0, 'nginx_reading' : 0, 'nginx_writing' : 0, 'nginx_waiting' : 0, } def nginx_stub_status(name): global _WorkerThread if _WorkerThread is None: return 0 if not _WorkerThread.running and not _WorkerThread.shuttingdown: _WorkerThread.start() _glock.acquire() ret = int(_conns[name]) _glock.release() return ret # Metric descriptions _descriptors = [ { 'name' : 'nginx_active_connections', 'call_back' : nginx_stub_status, 'time_max' : 20, 'value_type' : 'uint', 'units' : 'Connections', 'slope' : 'both', 'format' : '%u', 'description': 'Number of all open connections including connections to backends', 'groups' : 'nginx', }, { 'name' : 'nginx_server_accepts', 'call_back' : nginx_stub_status, 'time_max' : 20, 'value_type' : 'uint', 'units' : 'Connections', 'slope' : 'both', 'format' : '%u', 'description': 'Total number of connections nginx accepts', 'groups' : 'nginx', }, { 'name' : 'nginx_server_handled', 'call_back' : nginx_stub_status, 'time_max' : 20, 'value_type' : 'uint', 'units' : 'Connections', 'slope' : 'both', 'format' : '%u', 'description': 'Total number of connections nginx handled', 'groups' : 'nginx', }, { 'name' : 'nginx_server_requests', 'call_back' : nginx_stub_status, 'time_max' : 20, 'value_type' : 'uint', 'units' : 'Requests', 'slope' : 'both', 'format' : '%u', 'description': 'Total number of requests nginx handles', 'groups' : 'nginx', }, { 'name' : 'nginx_reading', 'call_back' : nginx_stub_status, 'time_max' : 20, 'value_type' : 'uint', 'units' : 'Requests', 'slope' : 'both', 'format' : '%u', 'description': 'reading', 'groups' : 'nginx', }, { 'name' : 'nginx_writing', 'call_back' : nginx_stub_status, 'time_max' : 20, 'value_type' : 'uint', 'units' : 'Requests', 'slope' : 'both', 'format' : '%u', 'description': 'writing', 'groups' : 'nginx', }, { 'name' : 'nginx_waiting', 'call_back' : nginx_stub_status, 'time_max' : 20, 'value_type' : 'uint', 'units' : 'Requests', 'slope' : 'both', 'format' : '%u', 'description': 'waiting', 'groups' : 'nginx', }, ] class NginxThread(threading.Thread): """ This thread continually gathers the current state of the nginx on the machine via nginx stub_state module. """ def __init__(self, url): super(NginxThread, self).__init__() self.running = False self.shuttingdown = False # the URL where stub_status is enabled # self.url = url # doesn't work self.url = "http://127.0.0.1/<%= node[:ganglia][:nginx][:status_url] %>" def shutdown(self): self.shuttingdown = True self.join() def run(self): global _conns #Set the state of the running thread self.running = True # continue running until a shutdown event is indicated while not self.shuttingdown: if self.shuttingdown: break try: res = urllib2.urlopen(self.url) except urllib2.HTTPError, e: time.sleep(_refresh_rate) continue if res.code != 200: # error from nginx pass tempconn = dict([(k, 0) for k in _conns.keys()]) # Active connections first = res.readline() or '' matcher = re.match(r'Active connections: (\d+)', first) if matcher: tempconn['nginx_active_connections'] = int(matcher.group(1)) # server accepts handled requests # discard this line res.readline() # actual data of server accepts, handled, requests second = res.readline() or '' matcher = re.match(r' (\d+) (\d+) (\d+)', second) if matcher: accepts, handled, requests = matcher.groups() tempconn['nginx_server_accepts'] = int(accepts) tempconn['nginx_server_handled'] = int(handled) tempconn['nginx_server_requests'] = int(requests) # Reading: 0 Writing: 1 Waiting: 0 third = res.readline() or '' matcher = re.match(r'Reading: (\d+) Writing: (\d+) Waiting: (\d+)', third) if matcher: reading, writing, waiting = matcher.groups() tempconn['nginx_reading'] = int(reading) tempconn['nginx_writing'] = int(writing) tempconn['nginx_waiting'] = int(waiting) # acquire a lock and copy the temporary state # to the global state dictionary _glock.acquire() for key, value in tempconn.items(): _conns[key] = value _glock.release() # wait for the refresh_rate period before collecting the status again if not self.shuttingdown: time.sleep(_refresh_rate) # Set the current state of the thread after a shutdown has been indicated. self.running = False def metric_init(params): '''Initialize the tcp connection status module and create the metric definition dictionary object for each metric.''' global _refresh_rate, _WorkerThread #Read the refresh from the gmond.conf parameters. if 'refresh' in params: try: _refresh_rate = int(params['refresh']) except (ValueError, TypeError): _refresh_rate = 20 url = params.get('url') #Start the worker thread _WorkerThread = NginxThread(url) #Return the metric descriptions to Gmond return _descriptors def metric_cleanup(): '''Clean up the metric module.''' #Tell the worker thread to shutdown _WorkerThread.shutdown() #This code is for debugging and unit testing if __name__ == '__main__': try: params = {'refresh': '20', 'url' : 'http://127.0.0.1/<%= node[:ganglia][:nginx][:status_url] %>', } metric_init(params) while 1: for d in _descriptors: v = d['call_back'](d['name']) print 'value for %s is %u' % (d['name'], v) time.sleep(5) except KeyboardInterrupt: time.sleep(0.2) os._exit(1)