故意踩坑式写脚本 —— 简单检测 TCP 连接

释放双眼,带上耳机,听听看~!

墙换成 TCP 阻断的模式以后,就没法想以前那样用多地 ping 的方式检测,只好自己手写一个 TCP 连接来判断。本来可以很轻松的搞定,但写的时候故意放了一些啰嗦的东西在里面,强迫自己多了解几个不同的场景。

成品

日志记录展开目录

这是一个非常常见的场景,但是以前的写法都不太好,这次总结一个比较方便也能利用到日志模块功能的写法。

写一个 log 类,比如放在 utils/log.py 里。

import logging
import logging.config
import os

try:
    from settings import LOG_TYPE
except ImportError:
    print('Copy settings.example.py to settings.py first!')
    exit(code=1)


class Logger(object):
    def __init__(self, logger):
        """
        封装 log 类,用来统一获取 logger
        :param logger: 传入 logger 名称,可用 __name__ 代替
        """
        log_file = 'logs/wt.log'
        log_dir = os.path.split(log_file)[0]
        if not os.path.exists(log_dir):
            os.mkdir(log_dir)

        logging_dict = {
            'version': 1,
            'disable_existing_loggers': False,
            'loggers': {
                '': {
                    'level': 'DEBUG',
                    'handlers': LOG_TYPE,
                },
            },
            'formatters': {
                'colored_console': {
                    '()': 'coloredlogs.ColoredFormatter',
                    'format': "[%(asctime)s] [%(name)s] [line:%(lineno)s] [%(levelname)s] %(message)s",
                    'datefmt': '%Y-%m-%d %H:%M:%S',
                },
                'format_for_file': {
                    'format': "[%(asctime)s] [%(name)s] [line:%(lineno)s] [%(levelname)s] %(message)s",
                    'datefmt': '%Y-%m-%d %H:%M:%S',
                }
            },
            'handlers': {
                'console': {
                    'level': 'INFO',
                    'class': 'logging.StreamHandler',
                    'formatter': 'colored_console',
                },
                'file': {
                    'level': 'DEBUG',
                    'class': 'cloghandler.ConcurrentRotatingFileHandler',
                    'formatter': 'format_for_file',
                    'filename': log_file,
                    'encoding': 'utf-8',
                    'delay': True,
                    'maxBytes': 1024 * 1024 * 100,
                    'backupCount': 5,
                }
            }
        }

        logging.config.dictConfig(logging_dict)
        logger = logging.getLogger(logger)

        # requests 不打印 log
        logging.getLogger("requests").setLevel(logging.WARNING)
        logging.getLogger("urllib3").setLevel(logging.WARNING)

        self.logger = logger

    def get_logger(self):
        return self.logger

日志类有两个依赖。

coloredlogs
ConcurrentLogHandler

一个是用来在控制台彩色输出,另一个是用来进程安全地写日志。

首先是创建了存放日志的文件夹,然后在 logging_dict 中,定义了两个 handler,用于控制台和文件输出,两个 handler 的 formatter 也做了定义,即 colored_console 和 format_for_file

                'colored_console': {
                    '()': 'coloredlogs.ColoredFormatter',
                    'format': "[%(asctime)s] [%(name)s] [line:%(lineno)s] [%(levelname)s] %(message)s",
                    'datefmt': '%Y-%m-%d %H:%M:%S',
                }

这里是为了引用 coloredlogs 自己的类来定义 formatter,颜色输出都是库默认的。

        logging.config.dictConfig(logging_dict)
        logger = logging.getLogger(logger)

        self.logger = logger
    def get_logger(self):
        return self.logger

这里用类初始化的参数定义了 logger 的名字,用 get_logger 方法返回。

        # requests 不打印 log
        logging.getLogger("requests").setLevel(logging.WARNING)
        logging.getLogger("urllib3").setLevel(logging.WARNING)

requests 库会自己开始打日志,很多都没有必要,所以调整一下日志的级别。

在别的地方引入日志类,自行初始化。

logger = Logger(__name__).get_logger()
[2018-07-21 21:32:40] [__main__] [line:58] [INFO] xx.xxx TCP test success.
[2018-07-21 21:32:40] [__main__] [line:48] [INFO] xxxxxxx ICMP test success.
[2018-07-21 21:32:40] [__main__] [line:48] [INFO] xxxxxxxxx ICMP test success.
[2018-07-21 21:32:44] [__main__] [line:32] [ERROR] Notify error: {'errno': 1024, 'errmsg': '不要重复发送同样的内容'}

通知推送展开目录

内容复杂的可以用邮件推送,除了 SMTP 以外还可以用一些邮件服务商,比如 MailGun 之类。如果比较简单就可以用 ServerChan,直接推送到微信。

任务调度展开目录

本来一个 crontab 就能做的很好的事情,我就是想故意踩个坑,所以用了 Python 来实现,这里用了 apscheduler 库。

apscheduler 分了几个模块,分别是 Triggers,Jobstores,Executors,Schedulers。

Triggers 就是触发器,常用的有 date,interval 和 cron 三种,date 用来执行单次的任务,interval 执行间隔时间的任务,cron 执行的是 crontab 的规则。

Schedulers 是调度器,常用的其实就两种。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler

一种是阻塞式的,另一种是添加了任务后就放到后台运行。

添加任务可以用装饰器的方式。

sched = BlockingScheduler()

@sched.scheduled_job(trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=1), id='first-run')
@sched.scheduled_job(trigger='cron', hour='10,20')
def main():

添加一个单次任务是因为调度的任务在程序跑第一次的时候并不会运行。。不便调试。。。

还可以写一个异常处理,免得接不住异常导致一些奇奇怪怪的问题。

def sched_listener(event):
    """
    apscheduler 事件监听
    :param event:
    :return:
    """
    if event.exception:
        notify('定时任务出错', key=SC_KEY)
        logger.error('定时任务出错 {e}'.format(e=event.exception))


sched.add_listener(sched_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

这样抛了异常也可以推送一条通知过来。

部署展开目录

nohup 直接跑也行,我自己用了 supervisor。

[program:wt]
directory=/home/user/wt/
command=/home/user/.pyenv/shims/python3 wt.py
stdout_logfile=none
stderr_logfile=none
user=user
autostart=true
autorestart=true

logfile 配成 none 是因为自己写了日志,不需要再写一遍了,command 里最好用绝对路径。

声明:本站只作网络安全交流之用。严禁发布低俗、违法、涉及政治的内容。如若本站内容侵犯了原著者的合法权益,可联系我们[181-1319-3168]进行及时处理。

给TA打赏
共{{data.count}}人
人已打赏
漏洞播报

PHP-FPM配置

2020-1-28 1:02:36

新闻中心

从 MySQL 出发的反击之路

2020-1-28 1:06:18

免责声明:红客联盟致力于保护作者版权,部分文字/图片来自互联网,无法核实真实出处,如涉及版权问题,请及时联系我们删除[181-1319-3168]。从该公众号转载本文至其他平台所引发一切纠纷与本站无关。支持原创!
0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索