SoFunction
Updated on 2024-11-12

python real-time monitoring logstash log code

Read logstash logs in real time, and trigger alarms if there is abnormal error keywork.

# /usr/bin/env python3
# -*- coding: utf-8 -*-
# __author__ = caozhi
# create_time 2018-11-12,update_time 2018-11-15
# version = 1.0
# Video high availability alarms
# 1 Reading Logs Using Cursor Move
# 2 On-line business log files are cut, and after cutting, the logs from the last cut are read

import os
import sys
import json
import requests
import time
import re

cini = '
log_file = '

def readconf():
 try:
 with open(cini, 'r+') as f:
  CONF = (f)
 except:
 CONF = {"seek": 0, "inode": 922817, "last_file": "}
 writeconf(CONF=CONF)
 print(' Configuration file is missing, automatically create a new one ')
 return CONF

def writeconf(CONF):
 with open(cini, 'w+') as e:
 (CONF, e)

def read_log(log_file, seek):
 try:
 f = open(log_file, 'r')
 except FileNotFoundError:
 f = open(', 'r')
 seek = 0
 print('The last file read failed, please check the cut log file')
 except:
 print('Log file open error, exit program')
 ()

(seek)
line = ()
new_seek = ()
if new_seek == seek:
 print('No logs were appended, exiting the program')
 ()

while line:
 try:
 logstash = (line)
 except:
 CONF = {"seek": 0, "inode": 922817, "last_file": "/data/logs/lmrs/"}
 writeconf(CONF=CONF)
 print('Error loading json data, recreating a new config file')
 ()

 #if '''(("%Y:%H:%M", ()), ('log_time')) and '''('rtype') == 6 and ('uri') == '/publish' and ('event') == 0:
 if ('rtype') == 6 and ('uri') == '/publish' and ('event') == 0:
 value = 1
 stream = ('name')
 print('{} {}'.format(value, stream))
 record(value=value, stream=stream)
 else:
 value = 0
 stream = 0
 line = ()
seek = ()

return value, stream, seek

def record(value, stream):
 data = []
 record = {}
 record['metric'] = 'recording_high_availability_monitor'
 record['endpoint'] = ()[1]
 record['timestamp'] = int(())
 record['step'] = 60
 record['value'] = value
 record['counterType'] = 'GAUGE'
 record['Tags'] = '{}={}'.format(int(()), stream)
 (record)

if data:
 print('This is the json data for data')
 print(data)
 falcon_request = ("http://127.0.0.1:1988/v1/push", data=(data))
 #falcon_request = ("http://127.0.0.1:1988/v1/push", json=data)
 print('The json parameter request returns a status code of:' + str(falcon_request.status_code))
 print('The json parameter request returned as:' + str(falcon_request.text))

if __name__ == '__main__':
 print()
 print('***************************************')
 print('Time of this script execution:{}'.format(("%Y%m%d_%H%M", ())))
 CONF = readconf()
 print('first_CONF :{}'.format(CONF))
 print('NO1.log_file',log_file)
 last_inode = CONF['inode']
 inode = (log_file).st_ino
 print('last_inode: {} inode: {}'.format(last_inode, inode))

if inode == last_inode:
 seek = CONF['seek']
 next_file = 0
else:
 log_file = CONF['last_file'] + ("-%Y%m%d_", ()) + str(("%H%M", ()))[:-1] + '0'
 next_file = 1
 seek = CONF['seek']

print('NO2.log_file',log_file)
value, stream, seek = read_log(log_file=log_file,seek=seek)

if next_file:
 CONF['seek'] = 0
else:
 CONF['seek'] = seek

CONF['inode'] = (').st_ino
writeconf(CONF=CONF)
print('last_CONF :{}'.format(CONF))

Additional knowledge:logstash call exec

I'll cut to the chase, or just look at the code!

[elk@Vsftp logstash]$ cat  
input {
 stdin {
 } 
} 
filter {
 grok {
 match => [ "message","(?m)\s*%{TIMESTAMP_ISO8601:time}\s*(?<Level>(\S+)).*"]
 }
 date {
 match => ["time", "yyyy-MM-dd HH:mm:ss,SSS"]
 }
 mutate {
   add_field =>["type","tailong"]
   add_field =>["messager","%{type}-%{message}"]
   remove_field =>["message"]
  }
} 
output { 
 if ([Level] == "ERROR" or [messager] =~ "Exception" ) and [messager] !~ "Winkin service not connected." and [messager] !~ "Error calling the Winkin Agent System interface." and [messager] !~ "BusinessException" {
 exec {
  command => "/bin/ \"%{messager}\" \"%{type}\" "
 }
 }
 stdout { 
 codec =>rubydebug 
 } 
}
 
Vsftp:/root# cat /bin/ 
#!/usr/bin/perl 
use Net::SMTP;
use HTTP::Date qw(time2iso str2time time2iso time2isoz); 
use Data::Dumper;
use Getopt::Std;
use vars qw($opt_d );
getopts('d:');
# mail_user should be your_mail@
 $message= "@ARGV";
 $env="$opt_d";
 sub send_mail{
 my $CurrTime = time2iso(time());
 my $to_address = shift;
 my $mail_user = '@';
 my $mail_pwd = 'xx';
 my $mail_server = 'smtp.';
 
 my $from = "From: $mail_user\n";
 my $subject = "Subject: zjcap info\n";
 my $info = "$CurrTime--$message";
 my $message = <<CONTENT; 
 $info
CONTENT
 my $smtp = Net::SMTP->new($mail_server);
 
 $smtp->auth($mail_user, $mail_pwd) || die "Auth Error! $!";
 $smtp->mail($mail_user);
 $smtp->to($to_address);
 
 $smtp->data();  # begin the data
 $smtp->datasend($from); # set user
 $smtp->datasend($subject); # set subject
 $smtp->datasend("\n\n");
 $smtp->datasend("$message\n"); # set content
 $smtp->dataend();
 $smtp->quit();
};
 
send_mail ('@'); 
 
2017-01-12 10:19:19,888 jjjjj Exception
{
 "@version" => "1",
 "@timestamp" => "2017-01-12T02:19:19.888Z",
  "host" => "Vsftp",
  "time" => "2017-01-12 10:19:19,888",
  "Level" => "jjjjj",
  "type" => "tailong",
 "messager" => "tailong-2017-01-12 10:19:19,888 jjjjj Exception"
}

This above python real-time monitoring logstash log code is all that I have shared with you.