Airflow执行失败后添加微信告警

Airflow默认带有邮件了Slack告警方式,都是不太符合国情,要想第一时间收到告警信息,肯定是微信最为方便。
只需添加几行代码就能给Airflow带上微信模板信息推送功能,先决条件是需要申请微信公众号与开通模板信息功能。

一、安装 weixin-python的工具包

pip intall weixin-python

二、在DAG上添加执行失败回调与回调方法

def send_wechat(context):
    mp = WeixinMP(app_id='APP_ID', app_secret='APP_SECRET')
    mp.template_send(template_id='TEMPLATE_ID',
                     touser='USER_ID', data={
            "first": {
                "value": u"ARIFLOW执行告警!",
            },
            "content": {
                "value": u"ARIFLOW的DAG执行出错"
            },
            "occurtime": {
                "value": str(datetime.datetime.now())
            }
        })
on_failure_callback=send_wechat

收工。

CentOS使用pyenv优雅的安装Python3.6.8

由于CentOS默认安装了Python2.7,有时候我们需要Python3.6或者其他Python版本,这时候就需要升级Python2.7到Python其他版本,网上一大堆教程异常复杂。

第一步、安装pyenv

如果没有安装git,则安装

yum install git	

安装后执行命令,下载pyenv

git clone git://github.com/yyuu/pyenv.git ~/.pyenv

第二步、修改.bashrc

在末尾添加如下内容

export PYENV_ROOT="$HOME/.pyenv"
export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"

添加后执行

source ~/.bashrc

第三步、安装开发工具

yum groupinstall "Development Tools"  -y
yum install -y python-devel libevent-devel python-pip gcc xz-devel openssl-devel readline-devel sqlite-devel bzip2-devel

第四步、查看可用的Python列表,并选择安装

pyenv install --list
pyenv install 3.6.8

第五步、切换Python版本

pyenv versions
pyenv global 3.6.8
pyenv rehash


可以通过的 pyenv global 版本号 反复切换Python。该方法同样适用MacOS。

Airflow的DAG并行度控制

DAG中有两个初始化配置 concurrency 和 max_active_runs
concurrency:表示一个DAG,在同一时间点最大可以运行多少个Task。
max_active_runs:表示一个DAG,在同一时间点最多可以被运行几个。

假如我们一个DAG同一时间只能被运行一次,那么一定要指明 max_active_runs = 1
如果我们DAG中有10个Task,我们如果希望10个Task可以在触发后可以同时执行,那么我们的concurrency需要不小于10才行,若小于10,那么会有任务需要等待之前的任务执行完成才会开始执行。

就像上图,一个DAG被同时触发了2次,但是同时只有1个DAG的2个Task执行,另外一个DAG的两个Task被堵塞,等待资源的释放。

CentOS上Airflow安装部署流程

第一步、安装python开发包,用于编译Python第三方模块

yum install python-devel

第二步、安装Python的Mysql客户端模块

pip install mysqlclient

第三步、安装airflow、包括加密组件、用户登录组件

export AIRFLOW_HOME=~/airflow
export SLUGIFY_USES_TEXT_UNIDECODE=yes
pip install apache-airflow
pip install apache-airflow[mysql]
pip install apache-airflow[crypto]
pip install apache-airflow[password]

第四步、修改airflow.conf

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = LocalExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = mysql://root:xxxxxx@localhost:3306/airflow

第五步、彻底卸载Mariadb,安装Mysql5.7
彻底卸载老的,卸载前请备份数据

mysqldump -u root -p --all-databases > alldb.sql
yum remove mysql MySQL-server MySQL-shared MySQL-shared-compat MariaDB-server MariaDB-client
rpm -qa|grep mariadb
rpm -e mariadb-embedded-devel-5.5.60-1.el7_5.x86_64
rpm -e mariadb-devel-5.5.60-1.el7_5.x86_64
rpm -e mariadb-embedded-5.5.60-1.el7_5.x86_64
rpm -e --nodeps mariadb-libs-5.5.60-1.el7_5.x86_64

添加Mysql5.7的yum

vim /etc/yum.repos.d/mysql-community.repo
wget https://repo.mysql.com//mysql80-community-release-el7-2.noarch.rpm
rpm -Uvh mysql80-community-release-el7-2.noarch.rpm
yum install mysql-community-server
systemctl start mysqld
systemctl status mysqld

mysql-community.repo内容如下:
[mysql57-community]
name=MySQL 5.7 Community Server
baseurl=http://repo.mysql.com/yum/mysql-5.7-community/el/7/$basearch/
enabled=1
gpgcheck=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql

上面步骤后Mysql5.7安装完成!
查看Mysql安装后的临时密码,并修改密码,并创建数据库airflow

grep 'temporary password' /var/log/mysqld.log
mysql -uroot -p
ALTER USER 'root'@'localhost' IDENTIFIED BY 'xxxxxx'
FLUSH  PRIVILEGES;
CREATE DATABASE IF NOT EXISTS airflow  DEFAULT CHARSET utf8;

修改my.cnf

vim /etc/my.cnf

添加 explicit_defaults_for_timestamp = 1
重启Mysql

systemctl restart mysqld

第六步、初始化airflow数据库、启动web

airflow initdb

第七步、配置web登录

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

执行下面python初始化一个用户

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'youfangxi'
user.email = '951868171@qq.com'
user.password = 'xxxxxx'
# 超级管理员添加 user.superuser = True
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

启动过程中若遇到MySQLdb ImportError: libmysqlclient.so.18  问题,可通过下面步骤解决

yum provides */libmysqlclient.so.18

查看那些包提供 libmysqlclient.so.18 然后安装

yum install mysql-community-libs-compat-5.7.25-1.el7.x86_64

这样最基本的一个带用户认证的airflow就基本安装完成!

使用row_number函数实现连续内容分组

有如下数据,需要获取每个EVENT持续的时间段

ID EVENT CREATED_AT
1 EVENT1 2018-06-15 15:34:04
2 EVENT1 2018-06-15 15:34:08
3 EVENT1 2018-06-15 15:40:04
4 EVENT2 2018-06-15 15:41:04
5 EVENT2 2018-06-15 15:42:04
6 EVENT2 2018-06-15 15:43:05
7 EVENT1 2018-06-15 15:44:04
8 EVENT1 2018-06-15 15:44:08
9 EVENT3 2018-06-15 15:45:04
10 EVENT3 2018-06-15 15:46:04
11 EVENT3 2018-06-15 15:47:05

这种情况下可以通过窗口函数row_number() 实现,SQL大致如下:

SELECT EVENT, MIN(CREATED_AT) AS STARED_AT, MAX(CREATED_AT) AS ENDED_AT
FROM (
       SELECT ID,
              EVENT,
              CREATED_AT,
              ROW_NUMBER() OVER(PARTITION BY EVENT ORDER BY ID) ROW_NO
       FROM T_EVENT) as T
GROUP BY EVENT, ID - ROW_NO

查询后的结果如下:

EVENT STARTED_AT ENDED_AT
EVENT1 2018-06-15 15:34:04 2018-06-15 15:40:04
EVENT1 2018-06-15 15:44:04 2018-06-15 15:44:08
EVENT2 2018-06-15 15:41:04 2018-06-15 15:43:05
EVENT3 2018-06-15 15:45:04 2018-06-15 15:47:05

使用XShell的隧道功能实现网络穿透

通常我们开发都是在自己电脑环境下开发,遇到需要与外部网络调试时将非常不方便,比如微信公众号开发,物联网开发等场景;如果可以将服务器的请求转发到自己的电脑上那么这些问题都迎刃而解;这时候我们可以使用xshell的隧道功能,将服务器的某个端口转发到自己电脑上

0x0,安装好Xshell并配置好服务器IP

0x1,配置隧道

如图:类型(方向)选择远程传入,源主机0.0.0.0,目标主机为局域网IP,千万不用使用127.0.0.1否则接收不到;

配置完成后我们链接到服务器用 netstat 命令可以查看

如图,服务器在0.0.0.0上监听了9999端口,若显示的是127.0.0.1:9999则还需要修改 sshd_conf文件

vim /etc/ssh/sshd_config

找到GatewayPorts 去掉注释将 no 改成 yes

GatewayPorts yes

再执行 systemctl restart sshd.service 重启sshd 服务,关闭Xshell再次进入后执行 netstat  命令将显示0.0.0.0:9999 ;

这样所有服务器9999端口的请求都会转发到Xshell所在局域网的192.168.188.125的9999端口上;

如何使用 ZeroBrane Studio 调试嵌入式 Lua

C++调用Lua如何调试lua代码呢,可以使用 mobdebug.lua + ZeroBrane Studio 进行调试

0x00:工具下载

mobdebug.lua 项目地址:https://github.com/pkulchenko/MobDebug

ZeroBrane Studio 项目地址:https://github.com/pkulchenko/ZeroBraneStudio

ZeroBrane Studio 官网地址:https://studio.zerobrane.com/

在官网上找到对应的操作系统版本下载

0x01:实例操作

C++代码

#include "lua.hpp"

int main() {
    lua_State *L =  luaL_newstate();
    luaL_openlibs(L);
    luaopen_debug(L);
    luaL_dofile(L,"a.lua");
    lua_pcall(L,0,0,0);
    lua_close(L);
    return 0;
}

通过C++代码调用a.lua,输出“hello world”

a.lua

require('mobdebug').start()
function hello(l)
    print("hello world")
end
hello(1);

代码很简单,只要在需要调试的lua文件上加上require(‘mobdebug’).start()这行代码。并且将 mobdebug.lua 文件,放到lua代码目录下,如下图:

打开ZeroBrane Studio,将lua代码目录设置成project,在需要调试的地方打上断点。

在Project菜单,选择 Start Debugger Server即可。

然后直接执行C++代码;触发断点会在ZeroBrane中显示,就是这么简单

主要窗口就是 Stack和Watch窗口,一个显示调用栈,一个用来查看当前变量的值信息。

通过调整 require(‘mobdebug’).start(‘192.168.1.110’) 的IP参数OR端口,可实现调试嵌入移动端的Lua