碳基体

奋斗在产品安全第一线的安全妹子

大数据之hadoop streaming 分析MySQL全日志

前言:

原本一直使用原生的java包进行mapreduce计算,但发现调用第三方语言写的包(e.x python)不方便,因此决定采用streaming方式来处理。


一、任务场景

MySQL全日志解析,提取sql语句,检测sql语句是否存在注入


二、补充知识

MySQL全日志格式

[timestamp]\t[thread_id]\s[command_type]\t[sql_text]\n对应正则

db_pattern = r"^(\d{2}\d{2}\d{2}\s+\d{1,2}:\d{2}:\d{2}|\t)\s+\d+\s+([A-Za-z]+)\s*(.*)$"实例

160114 11:15:02   903 Query     BEGIN


                  903 Query     REPLACE INTO heartbeat SET id='abc_0000', value=142341302

解析中的问题:

(1)timestamp补全,MySQL规定如果时间与上一条日志相同打印一个"\t",否则打印时间戳

(2)多行SQL语句拼接,一条sql语句会出现在多行,hadoop mapreduce默认处理文本的FileInputFormat格式为TextInputFormat,而TextInputFormat是使用\n来切割单行的


三、hadoop streaming 编码

用hadoop streaming来进行MySQL日志的SQL语句拼接与timestamp补全

1. hadoop streaming原理

使用标准流作为hadoop和应用程序之间的接口,从 stdin读取数据,排序 ,将结果输出到stdout,类似linux管道


优点是:可以用熟悉的语言的来干活

缺点是:

(1)reduce的代码需要自己来迭代 不会像原生的java生成 <k, list<v>>

(2)如果需要深度定制,还是要java的编码

区别:

(1) 原生java可以只有reducer ,streaming必须指定一个mapper

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(ParserRequestWritable.class);

job.setNumMapTasks(0);

job.setReducerClass(UniqueReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(ParserRequestWritable.class);

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer /bin/wc


 2.hadoop streaming 环境配置

注意:版本不同位置不同


vim ~/.bashrc

HADOOP_HOME=/home/{user}/hadoop-2.6.0

alias HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar"

source ~/.bashrc 

查看hadoop streaming的使用选项

HSTREAMING -info

3. hadoop streaming编程

选择Python,数据科学友好语言。其实R也是可以的

由于总会用到第三方算法库,考虑到hadoop集群python版本问题,最好将整个开发运行环境打包上传到hadoop

完整包的目录结构

lib/Python27 Python库

src/main src/test 源码

data 数据文件



选择最稳定的2.7


(1).Python 编译

wget https://www.python.org/ftp/python/2.7.10/Python-2.7.10.tgz

cd Python-2.7.10

compile

./configure --prefix=lib/Python27

make -j

make install

(2). 编写mapper程序 normalize_mapper.py

作用:将原始的日志解析为<SQL_BASE64, TIMESTAMP_BASE64>对

# -*- coding: utf-8 -*-

import sys

import json

import re

import logging

import base64


#db log format configure

db_pattern = r"^(\d{2}\d{2}\d{2}\s+\d{1,2}:\d{2}:\d{2}|\t)\s+\d+\s+([A-Za-z]+)\s*(.*)$"

db_regex = re.compile(db_pattern)

sql_pattern = r"^(\S+)\s"

sql_regex = re.compile(sql_pattern)


#log configure 用于调试是否正确解析,例如SQL拼接是否正确

logging.basicConfig(level = logging.ERROR,

                    format = '%(message)s',                    

                    stream = sys.stderr)


#query blacklist configure 只处理query语句

command_blacklist = [

"query"

]


query_blacklist = [

"select",

"update",

"insert",

"delete",

"replace"

]



def read_mapper_output(file):

    """

    read data from file using yield

    """

    for line in file:

        yield line.rstrip() 



def db_log_normailze():

    """

    normalize db log, extend timestamp and merge multi-line sql statement

    """


    #read data from stdin

    data = read_mapper_output(sys.stdin)


    #last time

    last_time = "\t"


    #current time command and sql

    time = ""

    command = ""

    sql = ""

    line_number = 1


    for line in data:

        db_match = db_regex.search(line)

        if db_match:

            if command != "":

                if sql and command.lower() in command_blacklist:

                    sql_match = sql_regex.search(sql)

                    if sql_match:

                        sql_command = sql_match.group(1)

                        if sql_command.lower() in query_blacklist:

                            debug = "FINAL_RESULT %d: %s %s %s" %(line_number - 1, time, command, sql)

                            logging.debug(debug)

                            sql_base64 = base64.b64encode(sql)

                            time_base64 = base64.b64encode(time)

                            print "%s\t%s" %(sql_base64, time_base64)                           

            else:

                info ="NULL_COMMAND %d: %s %s %s" %(line_number - 1, time, command, sql)

                logging.info(info)


            time, command, sql = db_match.groups()            


            #time extend

            if time == "\t":

                time = last_time

            else:

                last_time = time

        else:

            #for debug

            info = "MULTI_LINE %d: %s" %(line_number, line.strip())

            logging.info(info)


            if command != "":

                sql = sql + line


        line_number = line_number + 1


if __name__ == '__main__':

    db_log_normailze()    

(3).编写reducer程序 normalize_reducer.py

通过上一步输出了<SQL_BASE64, TIMESTAMP_BASE64>对,接着按以下格式来处理数据

<SQL_BASE64,List[TIMESTAMP_BASE64]>, 悲剧的是streaming处理方式,需要自己来group,以下只演示如何group

#!/usr/bin/env python
import sys
import re
import base64
import logging
from itertools import groupby
from operator import itemgetter
import sqli_check
import udf_tool


#log configure
logging.basicConfig(level = logging.ERROR,
                    format = '%(message)s',                    
                    stream = sys.stderr)


def read_mapper_output(file, separator='\t'):
    """
    read data from file and split each line into k,v pair
    """
    for line in file:
        yield line.strip().split(separator, 1)


def db_log_sql_parse():    
    data = read_mapper_output(sys.stdin, separator='\t')

    for sql_base64, group in groupby(data, itemgetter(0)):
        num_of_request = 0
        time_list = set()
        """
        k: sql_base64
        v: time_base64
        """
        for k, v in group:
            time = base64.b64decode(v)
            num_of_request = num_of_request + 1
            time_list.add(time)
   
        sql_parser_result = sqli_check.parser(sql_base64)

(4). 本地测试

cat ../../data/mysql.log |../../lib/Python27/bin/python normalize_mapper.py |sort -k1|../../lib/Python27/bin/python normalize_reducer.py 1>result 2>debug &

(5).集群运行

(1)打包上传代码环境

tar zcvf sqlicheck.tar.gz lib/ src/

hadoop fs -put sqlicheck.tar.gz /

(2)任务运行

a. 版本2.x

HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar"

$HSTREAMING  \

-D mapred.job.name='normalize db log'  \

-archives "hdfs://xxx.xxx.xxx.xxx:xxx/sqlicheck.tar.gz#sqlicheck"  \

-input  $INPUT_PATH  \

-output $OUTPUT_PATH  \

-mapper "sqlicheck/lib/Python27/bin/python sqlicheck/src/main/normalize_mapper.py "  \

-reducer "sqlicheck/lib/Python27/bin/python sqlicheck/src/main/normalize_reducer.py"

b. 版本1.x

$HADOOP_HOME/bin/hadoop  --config $HADOOP_HOME/conf streaming \

-D mapred.job.name='normalize db log'  \

-input $INPUT_PATH  \

-output $OUTPUT_PATH  \ 

-mapper "sh sqlicheck/src/main/normalize_mapper.sh "  \

-reducer "sh sqlicheck/src/main/normalize_reducer.sh" \

-cacheArchive/sqlicheck/sqlicheck.tar.gz#sqlicheck

为什么是sh,不是py,这里有个悲催的原因,见下面


四、疑问解决
1. 解决hadoop集群, 第三方python版本依赖库缺失问题

/lib64/libc.so.6: version `GLIBC_2.14' not found

当出现以上错误的时候,就是你上传的python环境的依赖库缺失了

解决方法就是查看python的依赖库,打包上传到hadoop中

ldd lib/Python27/bin/python

cp /lib/x86_64-linux-gnu/libpthread.so.0 lib/Python27/lib/

cp /lib/x86_64-linux-gnu/libdl.so.2 lib/Python27/lib/

cp /lib/x86_64-linux-gnu/libutil.so.1 lib/Python27/lib/

cp /lib/x86_64-linux-gnu/libm.so.6 lib/Python27/lib/

cp /lib/x86_64-linux-gnu/libc.so.6 lib/Python27/lib/ 

在python运行前,先导入LD_LIBRARY_PATH位置

vim normalize_mapper.sh

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:sqlicheck/lib/Python27/lib

sqlicheck/lib/Python27/bin/python sqlicheck/src/main/normalize_mapper.py

这也是为啥是sh,不是py了



来源:碳基体

评论