8 python 調用 zabbix 的 api 接口添加主機、查詢組、主機、模板
Python實戰-從菜鳥到大牛的進階之路 作者:極客學院 投票推薦 加入書簽 留言反饋
zabbix 有一個 api 接口,可以調用這些幾口來自動添加主機,查詢 zabbix 中監控的主機,監控的模板、監控的主機組等信息,使用也非常的方便。以下是用 python 調用 zabbix 的 api 接口來實現上述功能:
#!/usr/bin/env python # -*- coding: utf-8 -*-import jsonimport urllib2import sysss zabbixtools: def __init__(self):self.url = "http://192.168.100.200/zabbix/api_jsonrpc.php"self.header = {"content-type": "application/json"}self.authid = self.user_login def user_login(self):data = json.dumps({ "jsonrpc": "2.0", "method": "user.login", "params": {"user": "admin","password": "zabbix"}, "id": 0 })request = urllib2.request(self.url,data)for key in self.header: request.add_header(key,self.header[key])try: result = urllib2.urlopen(request)except urlerror as e: print "auth failed, please check your name and password:",e.codeelse: response = json.loads(result.read) result.close authid = response[\''result\''] return authid def get_data(self,data,hostip=""):request = urllib2.request(self.url,data)for key in self.header: request.add_header(key,self.header[key])try: result = urllib2.urlopen(request)except urlerror as e: if hasattr(e, \''reason\''):print \''we failed to reach a server.\''print \''reason: \'', e.reason elif hasattr(e, \''code\''):print \''the server could not fulfill the request.\''print \''error code: \'', e.code return 0else: response = json.loads(result.read) result.close return response def host_get(self,hostip):#hostip = raw_input("\033[1;35;40m%s\033[0m" % \''enter your check host:host_ip :\'')data = json.dumps({ "jsonrpc": "2.0", "method": "host.get", "params": {"output":["hostid","name","status","host"],"filter": {"host": [hostip]}}, "auth": self.authid, "id": 1})res = self.get_data(data)[\''result\'']if (res != 0) and (len(res) != 0): #for host in res: host = res[0] if host[\''status\''] == \''1\'':print "t","\033[1;31;40m%s\033[0m" % "host_ip:","\033[1;31;40m%s\033[0m" % host[\''host\''].ljust(15),\''t\'',"\033[1;31;40m%s\033[0m" % "host_name:","\033[1;31;40m%s\033[0m" % host[\''name\''].encode(\''gbk\''),\''t\'',"\033[1;31;40m%s\033[0m" % u\''未在監控狀態\''.encode(\''gbk\'')return host[\''hostid\''] elif host[\''status\''] == \''0\'':print "t","\033[1;32;40m%s\033[0m" % "host_ip:","\033[1;32;40m%s\033[0m" % host[\''host\''].ljust(15),\''t\'',"\033[1;32;40m%s\033[0m" % "host_name:","\033[1;32;40m%s\033[0m" % host[\''name\''].encode(\''gbk\''),\''t\'',"\033[1;32;40m%s\033[0m" % u\''在監控狀態\''.encode(\''gbk\'')return host[\''hostid\''] printelse: print \''t\'',"\033[1;31;40m%s\033[0m" % "get host error or cannot find this host,please check !" return 0 def host_del(self):hostip = raw_input("\033[1;35;40m%s\033[0m" % \''enter your check host:host_ip :\'')hostid = self.host_get(hostip)if hostid == 0: print \''t\'',"\033[1;31;40m%s\033[0m" % "this host cannot find in zabbix,please check it !" sys.exitdata = json.dumps({ "jsonrpc": "2.0", "method": "host.delete", "params": [{"hostid": hostid}], "auth": self.authid, "id": 1})res = self.get_data(data)[\''result\'']if \''hostids\'' in res.keys: print "t","\033[1;32;40m%s\033[0m" % "delet host:%s sess !" % hostipelse: print "t","\033[1;31;40m%s\033[0m" % "delet host:%s failure !" % hostip def hostgroup_get(self):data = json.dumps({ "jsonrpc": "2.0", "method": "hostgroup.get", "params": {"output": "extend",}, "auth": self.authid, "id": 1, })res = self.get_data(data)if \''result\'' in res.keys: res = res[\''result\''] if (res !=0) or (len(res) != 0):print "\033[1;32;40m%s\033[0m" % "number of group: ", "\033[1;31;40m%d\033[0m" % len(res)for host in res: print "t","hostgroup_id:",host[\''groupid\''],"t","hostgroup_name:",host[\''name\''].encode(\''gbk\'')printelse: print "get hostgroup error,please check !" def temte_get(self):data = json.dumps({ "jsonrpc": "2.0", "method": "temte.get", "params": {"output": "extend",}, "auth": self.authid, "id": 1, })res = self.get_data(data)#[\''result\'']if \''result\'' in res.keys: res = res[\''result\''] if (res !=0) or (len(res) != 0):print "\033[1;32;40m%s\033[0m" % "number of temte: ", "\033[1;31;40m%d\033[0m" % len(res)for host in res: print "t","temte_id:",host[\''temteid\''],"t","temte_name:",host[\''name\''].encode(\''gbk\'')printelse: print "get temte error,please check !" def host_create(self):hostip = raw_input("\033[1;35;40m%s\033[0m" % \''enter your:host_ip :\'')groupid = raw_input("\033[1;35;40m%s\033[0m" % \''enter your:group_id :\'')temteid = raw_input("\033[1;35;40m%s\033[0m" % \''enter your:tempate_id :\'')g_list=t_list=for i in groupid.split(\'',\''): var = {} var[\''groupid\''] = i g_list.append(var)for i in temteid.split(\'',\''): var = {} var[\''temteid\''] = i t_list.append(var)if hostip and groupid and temteid: data = json.dumps( {"jsonrpc": "2.0","method": "host.create","params": { "host": hostip, "interfaces": [{ "type": 1, "main": 1, "useip": 1, "ip": hostip, "dns": "", "port": "10050"} ], "groups": g_list, "temtes": t_list, },"auth": self.authid,"id": 1,}) res = self.get_data(data,hostip) if \''result\'' in res.keys:res = res[\''result\'']if \''hostids\'' in res.keys: print "\033[1;32;40m%s\033[0m" % "create host sess" else:print "\033[1;31;40m%s\033[0m" % "create host failure: %s" % res[\''error\''][\''data\'']else: print "\033[1;31;40m%s\033[0m" % "enter error: ip or groupid or tempateid is null,please check it !"def main: test = zabbixtools #test.temte_get #test.hostgroup_get #test.host_get test.host_del #test.host_createif __name__ == "__main__": main </pre>
相關的材料的可以參考官方文檔。這個隻是一些功能模塊,包含獲取主機,主機組、模板、刪除主機等功能,可以根據需要進行調整,實現 zabbix 的批量化和自動化管理。因為是在 linux 運行,所以設置了輸出終端的字體顏色方便區分,如果不需要,自行刪除即可。
#!/usr/bin/env python # -*- coding: utf-8 -*-import jsonimport urllib2import sysss zabbixtools: def __init__(self):self.url = "http://192.168.100.200/zabbix/api_jsonrpc.php"self.header = {"content-type": "application/json"}self.authid = self.user_login def user_login(self):data = json.dumps({ "jsonrpc": "2.0", "method": "user.login", "params": {"user": "admin","password": "zabbix"}, "id": 0 })request = urllib2.request(self.url,data)for key in self.header: request.add_header(key,self.header[key])try: result = urllib2.urlopen(request)except urlerror as e: print "auth failed, please check your name and password:",e.codeelse: response = json.loads(result.read) result.close authid = response[\''result\''] return authid def get_data(self,data,hostip=""):request = urllib2.request(self.url,data)for key in self.header: request.add_header(key,self.header[key])try: result = urllib2.urlopen(request)except urlerror as e: if hasattr(e, \''reason\''):print \''we failed to reach a server.\''print \''reason: \'', e.reason elif hasattr(e, \''code\''):print \''the server could not fulfill the request.\''print \''error code: \'', e.code return 0else: response = json.loads(result.read) result.close return response def host_get(self,hostip):#hostip = raw_input("\033[1;35;40m%s\033[0m" % \''enter your check host:host_ip :\'')data = json.dumps({ "jsonrpc": "2.0", "method": "host.get", "params": {"output":["hostid","name","status","host"],"filter": {"host": [hostip]}}, "auth": self.authid, "id": 1})res = self.get_data(data)[\''result\'']if (res != 0) and (len(res) != 0): #for host in res: host = res[0] if host[\''status\''] == \''1\'':print "t","\033[1;31;40m%s\033[0m" % "host_ip:","\033[1;31;40m%s\033[0m" % host[\''host\''].ljust(15),\''t\'',"\033[1;31;40m%s\033[0m" % "host_name:","\033[1;31;40m%s\033[0m" % host[\''name\''].encode(\''gbk\''),\''t\'',"\033[1;31;40m%s\033[0m" % u\''未在監控狀態\''.encode(\''gbk\'')return host[\''hostid\''] elif host[\''status\''] == \''0\'':print "t","\033[1;32;40m%s\033[0m" % "host_ip:","\033[1;32;40m%s\033[0m" % host[\''host\''].ljust(15),\''t\'',"\033[1;32;40m%s\033[0m" % "host_name:","\033[1;32;40m%s\033[0m" % host[\''name\''].encode(\''gbk\''),\''t\'',"\033[1;32;40m%s\033[0m" % u\''在監控狀態\''.encode(\''gbk\'')return host[\''hostid\''] printelse: print \''t\'',"\033[1;31;40m%s\033[0m" % "get host error or cannot find this host,please check !" return 0 def host_del(self):hostip = raw_input("\033[1;35;40m%s\033[0m" % \''enter your check host:host_ip :\'')hostid = self.host_get(hostip)if hostid == 0: print \''t\'',"\033[1;31;40m%s\033[0m" % "this host cannot find in zabbix,please check it !" sys.exitdata = json.dumps({ "jsonrpc": "2.0", "method": "host.delete", "params": [{"hostid": hostid}], "auth": self.authid, "id": 1})res = self.get_data(data)[\''result\'']if \''hostids\'' in res.keys: print "t","\033[1;32;40m%s\033[0m" % "delet host:%s sess !" % hostipelse: print "t","\033[1;31;40m%s\033[0m" % "delet host:%s failure !" % hostip def hostgroup_get(self):data = json.dumps({ "jsonrpc": "2.0", "method": "hostgroup.get", "params": {"output": "extend",}, "auth": self.authid, "id": 1, })res = self.get_data(data)if \''result\'' in res.keys: res = res[\''result\''] if (res !=0) or (len(res) != 0):print "\033[1;32;40m%s\033[0m" % "number of group: ", "\033[1;31;40m%d\033[0m" % len(res)for host in res: print "t","hostgroup_id:",host[\''groupid\''],"t","hostgroup_name:",host[\''name\''].encode(\''gbk\'')printelse: print "get hostgroup error,please check !" def temte_get(self):data = json.dumps({ "jsonrpc": "2.0", "method": "temte.get", "params": {"output": "extend",}, "auth": self.authid, "id": 1, })res = self.get_data(data)#[\''result\'']if \''result\'' in res.keys: res = res[\''result\''] if (res !=0) or (len(res) != 0):print "\033[1;32;40m%s\033[0m" % "number of temte: ", "\033[1;31;40m%d\033[0m" % len(res)for host in res: print "t","temte_id:",host[\''temteid\''],"t","temte_name:",host[\''name\''].encode(\''gbk\'')printelse: print "get temte error,please check !" def host_create(self):hostip = raw_input("\033[1;35;40m%s\033[0m" % \''enter your:host_ip :\'')groupid = raw_input("\033[1;35;40m%s\033[0m" % \''enter your:group_id :\'')temteid = raw_input("\033[1;35;40m%s\033[0m" % \''enter your:tempate_id :\'')g_list=t_list=for i in groupid.split(\'',\''): var = {} var[\''groupid\''] = i g_list.append(var)for i in temteid.split(\'',\''): var = {} var[\''temteid\''] = i t_list.append(var)if hostip and groupid and temteid: data = json.dumps( {"jsonrpc": "2.0","method": "host.create","params": { "host": hostip, "interfaces": [{ "type": 1, "main": 1, "useip": 1, "ip": hostip, "dns": "", "port": "10050"} ], "groups": g_list, "temtes": t_list, },"auth": self.authid,"id": 1,}) res = self.get_data(data,hostip) if \''result\'' in res.keys:res = res[\''result\'']if \''hostids\'' in res.keys: print "\033[1;32;40m%s\033[0m" % "create host sess" else:print "\033[1;31;40m%s\033[0m" % "create host failure: %s" % res[\''error\''][\''data\'']else: print "\033[1;31;40m%s\033[0m" % "enter error: ip or groupid or tempateid is null,please check it !"def main: test = zabbixtools #test.temte_get #test.hostgroup_get #test.host_get test.host_del #test.host_createif __name__ == "__main__": main </pre>
相關的材料的可以參考官方文檔。這個隻是一些功能模塊,包含獲取主機,主機組、模板、刪除主機等功能,可以根據需要進行調整,實現 zabbix 的批量化和自動化管理。因為是在 linux 運行,所以設置了輸出終端的字體顏色方便區分,如果不需要,自行刪除即可。