Python如何创建看门狗?本文通过使用看门狗和 pygtail 库在 Python 中创建看门狗,了解如何监视和监视文件系统的事件。
在软件开发中,应用程序日志起着关键作用。尽管我们希望我们的软件是完美的,但问题总是会出现,因此重要的是要有一个强大的监控和日志记录来控制和管理不可避免的混乱。
如今,应用程序支持工程师需要能够轻松访问和分析其应用程序和基础架构生成的大量日志数据。出现问题时,他们无法等待一两分钟,直到查询返回结果。无论他们收集和查询的数据量如何,他们都需要速度。
在本教程中,你将学习如何在Python中创建看门狗,包括Python创建看门狗示例;我们将解释如何检测特定目录中的更改(假设该目录托管你的应用程序日志)。每当发生变化时,都会及时处理修改或新创建的预定义类型文件,以检索符合指定模式的行。
另一方面,这些文件中与指定模式不匹配的所有行都被视为异常值,并在我们的分析中被丢弃。
我们将使用 看门狗和pygtail库来检测发生的变化,还有一个 Flask、Redis 和 SocketIO 版本,其中创建了一个 GUI 网络应用程序用于相同的目的,你可以随时参考这里。
处理流程图
首先,让我们安装要求:
$ pip3 install Pygtail==0.11.1 watchdog==2.1.1
Python如何创建看门狗?首先,让我们为我们的应用程序定义配置参数config.py
:
# Application configuration File
################################
# Directory To Watch, If not specified, the following value will be considered explicitly.
WATCH_DIRECTORY = "C:\\SCRIPTS"
# Delay Between Watch Cycles In Seconds
WATCH_DELAY = 1
# Check The WATCH_DIRECTORY and its children
WATCH_RECURSIVELY = False
# whether to watch for directory events
DO_WATCH_DIRECTORIES = True
# Patterns of the files to watch
WATCH_PATTERN = '.txt,.trc,.log'
LOG_FILES_EXTENSIONS = ('.txt', '.log', '.trc')
# Patterns for observations
EXCEPTION_PATTERN = ['EXCEPTION', 'FATAL', 'ERROR']
中的参数config.py
将是默认参数,稍后在脚本中,我们可以根据需要覆盖它们。
接下来,让我们定义一个检查机制,该机制将利用模块pygtail和re以根据EXCEPTION_PATTERN
我们刚刚定义的参数来查明观察结果config.py
:
import datetime
from pygtail import Pygtail
# Loading the package called re from the RegEx Module in order to work with Regular Expressions
import re
class FileChecker:
def __init__(self, exceptionPattern):
self.exceptionPattern = exceptionPattern
def checkForException(self, event, path):
# Get current date and time according to the specified format.
now = (datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S")
# Read the lines of the file (specified in the path) that have not been read yet
# Meaning by that it will start from the point where it was last stopped.
for num, line in enumerate(Pygtail(path), 1):
# Remove leading and trailing whitespaces including newlines.
line = line.strip()
# Return all non-overlapping matches of the values specified in the Exception Pattern.
# The line is scanned from left to right and matches are returned in the oder found.
if line and any(re.findall('|'.join(self.exceptionPattern), line, flags=re.I | re.X)):
# Observation Detected
type = 'observation'
msg = f"{now} -- {event.event_type} -- File = {path} -- Observation: {line}"
yield type, msg
elif line:
# No Observation Detected
type = 'msg'
msg = f"{now} -- {event.event_type} -- File = {path}"
yield type, msg
checkForException()
上面代码中定义的方法将摄取由看门狗模块的观察者类调度的事件(稍后会看到)。
给定目录中的任何文件更改都会触发这些事件,事件对象具有 3 个属性:
event_type
:作为字符串的事件类型(修改、创建、移动或删除)。is_directory
: 一个布尔值,指示是否为目录发出事件。src_path
:触发事件的文件系统对象的源路径。
Python创建看门狗示例 - 现在让我们定义我们的controller.py
,首先,让我们导入库:
# The Observer watches for any file change and then dispatches the respective events to an event handler.
from watchdog.observers import Observer
# The event handler will be notified when an event occurs.
from watchdog.events import FileSystemEventHandler
import time
import config
import os
from checker import FileChecker
import datetime
from colorama import Fore, init
init()
GREEN = Fore.GREEN
BLUE = Fore.BLUE
RESET = Fore.RESET
RED = Fore.RED
YELLOW = Fore.YELLOW
event2color = {
"created": GREEN,
"modified": BLUE,
"deleted": RED,
"moved": YELLOW,
}
def print_with_color(s, color=Fore.WHITE, brightness=Style.NORMAL, **kwargs):
"""Utility function wrapping the regular `print()` function
but with colors and brightness"""
print(f"{brightness}{color}{s}{Style.RESET_ALL}", **kwargs)
我们将使用colorama
文本颜色来区分不同的事件,有关colorama 的更多信息,请查看本教程。
如何在Python中创建看门狗?接下来,让我们定义我们的事件处理程序:
# Class that inherits from FileSystemEventHandler for handling the events sent by the Observer
class LogHandler(FileSystemEventHandler):
def __init__(self, watchPattern, exceptionPattern, doWatchDirectories):
self.watchPattern = watchPattern
self.exceptionPattern = exceptionPattern
self.doWatchDirectories = doWatchDirectories
# Instantiate the checker
self.fc = FileChecker(self.exceptionPattern)
def on_any_event(self, event):
now = (datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S")
# print("event happened:", event)
# To Observe files only not directories
if not event.is_directory:
# To cater for the on_move event
path = event.src_path
if hasattr(event, 'dest_path'):
path = event.dest_path
# Ensure that the file extension is among the pre-defined ones.
if path.endswith(self.watchPattern):
msg = f"{now} -- {event.event_type} -- File: {path}"
if event.event_type in ('modified', 'created', 'moved'):
# check for exceptions in log files
if path.endswith(config.LOG_FILES_EXTENSIONS):
for type, msg in self.fc.checkForException(event=event, path=path):
print_with_color(msg, color=event2color[event.event_type])
else:
print_with_color(msg, color=event2color[event.event_type])
else:
print_with_color(msg, color=event2color[event.event_type])
elif self.doWatchDirectories:
msg = f"{now} -- {event.event_type} -- Folder: {event.src_path}"
print_with_color(msg, color=event2color[event.event_type])
def on_modified(self, event):
pass
def on_deleted(self, event):
pass
def on_created(self, event):
pass
def on_moved(self, event):
pass
复制在LogHandler
从类继承类命名FileSystemEventHandler
的的看门狗库,主要覆盖的 on_any_event()
方法。
Python如何创建看门狗?下面是一些有用的方法,如果这个类:
on_any_event()
: 调用任何事件。on_created()
: 创建文件或目录时调用。on_modified()
: 当文件被修改或目录被重命名时调用。on_deleted()
: 删除文件或目录时调用。on_moved()
: 当文件或目录移动时调用。
为该on_any_event()
方法分配的代码将:
- 观察文件和目录。
- 验证受事件影响的文件的扩展名是否在
WATCH_PATTERN
内部 变量中预定义的扩展名中config.py
- 如果检测到,则生成说明事件或观察的消息。
Python创建看门狗示例:现在让我们编写我们的LogWatcher
类:
class LogWatcher:
# Initialize the observer
observer = None
# Initialize the stop signal variable
stop_signal = 0
# The observer is the class that watches for any file system change and then dispatches the event to the event handler.
def __init__(self, watchDirectory, watchDelay, watchRecursively, watchPattern, doWatchDirectories, exceptionPattern, sessionid, namespace):
# Initialize variables in relation
self.watchDirectory = watchDirectory
self.watchDelay = watchDelay
self.watchRecursively = watchRecursively
self.watchPattern = watchPattern
self.doWatchDirectories = doWatchDirectories
self.exceptionPattern = exceptionPattern
self.namespace = namespace
self.sessionid = sessionid
# Create an instance of watchdog.observer
self.observer = Observer()
# The event handler is an object that will be notified when something happens to the file system.
self.event_handler = LogHandler(watchPattern, exceptionPattern, self.doWatchDirectories)
def schedule(self):
print("Observer Scheduled:", self.observer.name)
# Call the schedule function via the Observer instance attaching the event
self.observer.schedule(
self.event_handler, self.watchDirectory, recursive=self.watchRecursively)
def start(self):
print("Observer Started:", self.observer.name)
self.schedule()
# Start the observer thread and wait for it to generate events
now = (datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S")
msg = f"Observer: {self.observer.name} - Started On: {now} - Related To Session: {self.sessionid}"
print(msg)
msg = (
f"Watching {'Recursively' if self.watchRecursively else 'Non-Recursively'}: {self.watchPattern}"
f" -- Folder: {self.watchDirectory} -- Every: {self.watchDelay}(sec) -- For Patterns: {self.exceptionPattern}"
)
print(msg)
self.observer.start()
def run(self):
print("Observer is running:", self.observer.name)
self.start()
try:
while True:
time.sleep(self.watchDelay)
if self.stop_signal == 1:
print(
f"Observer stopped: {self.observer.name} stop signal:{self.stop_signal}")
self.stop()
break
except:
self.stop()
self.observer.join()
def stop(self):
print("Observer Stopped:", self.observer.name)
now = (datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S")
msg = f"Observer: {self.observer.name} - Stopped On: {now} - Related To Session: {self.sessionid}"
print(msg)
self.observer.stop()
self.observer.join()
def info(self):
info = {
'observerName': self.observer.name,
'watchDirectory': self.watchDirectory,
'watchDelay': self.watchDelay,
'watchRecursively': self.watchRecursively,
'watchPattern': self.watchPattern,
}
return info
这是我们在LogWatcher
类上所做的:
- 创建
watchdog.observer
线程类的实例,观察者监视任何文件系统更改,然后将相应的事件分派给事件处理程序。 - 创建一个事件处理程序的实例
LogHandler
,它从FileSystemEventHandler
. 发生任何更改时会通知事件处理程序。 - 为我们的观察者分配一个时间表并定义其他输入参数,如要观看的目录、观看模式等。请注意,将
recursive
参数设置为 时True
,你必须确保你对子文件夹具有足够的访问权限。
最后,让我们使用argparse
以下代码围绕代码创建命令行参数:
def is_dir_path(path):
"""Utility function to check whether a path is an actual directory"""
if os.path.isdir(path):
return path
else:
raise NotADirectoryError(path)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Watchdog script for watching for files & directories' changes")
parser.add_argument("path",
default=config.WATCH_DIRECTORY,
type=is_dir_path,
)
parser.add_argument("-d", "--watch-delay",
help=f"Watch delay, default is {config.WATCH_DELAY}",
default=config.WATCH_DELAY,
type=int,
)
parser.add_argument("-r", "--recursive",
action="store_true",
help=f"Whether to recursively watch for the path's children, default is {config.WATCH_RECURSIVELY}",
default=config.WATCH_RECURSIVELY,
)
parser.add_argument("-p", "--pattern",
help=f"Pattern of files to watch, default is {config.WATCH_PATTERN}",
default=config.WATCH_PATTERN,
)
parser.add_argument("--watch-directories",
action="store_true",
help=f"Whether to watch directories, default is {config.DO_WATCH_DIRECTORIES}",
default=config.DO_WATCH_DIRECTORIES,
)
# parse the arguments
args = parser.parse_args()
# define & launch the log watcher
log_watcher = LogWatcher(
watchDirectory=args.path,
watchDelay=args.watch_delay,
watchRecursively=args.recursive,
watchPattern=tuple(args.pattern.split(",")),
doWatchDirectories=args.watch_directories,
exceptionPattern=config.EXCEPTION_PATTERN,
)
log_watcher.run()
Python创建看门狗示例:我们定义is_dir_path()
是为了确保输入的路径是一个有效的目录。让我们使用脚本:
--recursive
观察E:\watchdog
目录中发生的一切,包括子文件夹,我还指定了一种模式.txt,.log,.jpg,.png
,用于观看文本和图像文件。
然后我创建了一个文件夹并开始在文本文件上写入,然后移动图像并将其删除,看门狗正在捕获所有内容!
请注意,你可以选择在config.py
此处覆盖参数或在此处传递参数。
结论
Python如何创建看门狗?在我们深入探讨了看门狗和pygtail库的可用功能之后,我希望本文对你有所帮助。
值得注意的是,通过扩展所描述的功能,你可以在日志文件中发生致命错误时关联警报机制或播放声音。通过这样做,当一个观察被精确定位时,配置的工作流或警报将被自动触发。