Read logstash logs in real time, and trigger alarms if there is abnormal error keywork.
# /usr/bin/env python3 # -*- coding: utf-8 -*- # __author__ = caozhi # create_time 2018-11-12,update_time 2018-11-15 # version = 1.0 # Video high availability alarms # 1 Reading Logs Using Cursor Move # 2 On-line business log files are cut, and after cutting, the logs from the last cut are read import os import sys import json import requests import time import re cini = ' log_file = ' def readconf(): try: with open(cini, 'r+') as f: CONF = (f) except: CONF = {"seek": 0, "inode": 922817, "last_file": "} writeconf(CONF=CONF) print(' Configuration file is missing, automatically create a new one ') return CONF def writeconf(CONF): with open(cini, 'w+') as e: (CONF, e) def read_log(log_file, seek): try: f = open(log_file, 'r') except FileNotFoundError: f = open(', 'r') seek = 0 print('The last file read failed, please check the cut log file') except: print('Log file open error, exit program') () (seek) line = () new_seek = () if new_seek == seek: print('No logs were appended, exiting the program') () while line: try: logstash = (line) except: CONF = {"seek": 0, "inode": 922817, "last_file": "/data/logs/lmrs/"} writeconf(CONF=CONF) print('Error loading json data, recreating a new config file') () #if '''(("%Y:%H:%M", ()), ('log_time')) and '''('rtype') == 6 and ('uri') == '/publish' and ('event') == 0: if ('rtype') == 6 and ('uri') == '/publish' and ('event') == 0: value = 1 stream = ('name') print('{} {}'.format(value, stream)) record(value=value, stream=stream) else: value = 0 stream = 0 line = () seek = () return value, stream, seek def record(value, stream): data = [] record = {} record['metric'] = 'recording_high_availability_monitor' record['endpoint'] = ()[1] record['timestamp'] = int(()) record['step'] = 60 record['value'] = value record['counterType'] = 'GAUGE' record['Tags'] = '{}={}'.format(int(()), stream) (record) if data: print('This is the json data for data') print(data) falcon_request = ("http://127.0.0.1:1988/v1/push", data=(data)) #falcon_request = ("http://127.0.0.1:1988/v1/push", json=data) print('The json parameter request returns a status code of:' + str(falcon_request.status_code)) print('The json parameter request returned as:' + str(falcon_request.text)) if __name__ == '__main__': print() print('***************************************') print('Time of this script execution:{}'.format(("%Y%m%d_%H%M", ()))) CONF = readconf() print('first_CONF :{}'.format(CONF)) print('NO1.log_file',log_file) last_inode = CONF['inode'] inode = (log_file).st_ino print('last_inode: {} inode: {}'.format(last_inode, inode)) if inode == last_inode: seek = CONF['seek'] next_file = 0 else: log_file = CONF['last_file'] + ("-%Y%m%d_", ()) + str(("%H%M", ()))[:-1] + '0' next_file = 1 seek = CONF['seek'] print('NO2.log_file',log_file) value, stream, seek = read_log(log_file=log_file,seek=seek) if next_file: CONF['seek'] = 0 else: CONF['seek'] = seek CONF['inode'] = (').st_ino writeconf(CONF=CONF) print('last_CONF :{}'.format(CONF))
Additional knowledge:logstash call exec
I'll cut to the chase, or just look at the code!
[elk@Vsftp logstash]$ cat input { stdin { } } filter { grok { match => [ "message","(?m)\s*%{TIMESTAMP_ISO8601:time}\s*(?<Level>(\S+)).*"] } date { match => ["time", "yyyy-MM-dd HH:mm:ss,SSS"] } mutate { add_field =>["type","tailong"] add_field =>["messager","%{type}-%{message}"] remove_field =>["message"] } } output { if ([Level] == "ERROR" or [messager] =~ "Exception" ) and [messager] !~ "Winkin service not connected." and [messager] !~ "Error calling the Winkin Agent System interface." and [messager] !~ "BusinessException" { exec { command => "/bin/ \"%{messager}\" \"%{type}\" " } } stdout { codec =>rubydebug } } Vsftp:/root# cat /bin/ #!/usr/bin/perl use Net::SMTP; use HTTP::Date qw(time2iso str2time time2iso time2isoz); use Data::Dumper; use Getopt::Std; use vars qw($opt_d ); getopts('d:'); # mail_user should be your_mail@ $message= "@ARGV"; $env="$opt_d"; sub send_mail{ my $CurrTime = time2iso(time()); my $to_address = shift; my $mail_user = '@'; my $mail_pwd = 'xx'; my $mail_server = 'smtp.'; my $from = "From: $mail_user\n"; my $subject = "Subject: zjcap info\n"; my $info = "$CurrTime--$message"; my $message = <<CONTENT; $info CONTENT my $smtp = Net::SMTP->new($mail_server); $smtp->auth($mail_user, $mail_pwd) || die "Auth Error! $!"; $smtp->mail($mail_user); $smtp->to($to_address); $smtp->data(); # begin the data $smtp->datasend($from); # set user $smtp->datasend($subject); # set subject $smtp->datasend("\n\n"); $smtp->datasend("$message\n"); # set content $smtp->dataend(); $smtp->quit(); }; send_mail ('@'); 2017-01-12 10:19:19,888 jjjjj Exception { "@version" => "1", "@timestamp" => "2017-01-12T02:19:19.888Z", "host" => "Vsftp", "time" => "2017-01-12 10:19:19,888", "Level" => "jjjjj", "type" => "tailong", "messager" => "tailong-2017-01-12 10:19:19,888 jjjjj Exception" }
This above python real-time monitoring logstash log code is all that I have shared with you.