Pandas 和 MySQL 搭配使用

Posted by Scott on November 17, 2016

背景

公司服务器托管在阿里云,系统为 CentOS,所有商品数据在其 MySQL 数据库中。

现在需要一个批量/自动化脚本能把采购经理整理的 Excel 数据一键上传到数据库。

解决方案

也许你会想「这个简单,把 Excel 转化为 SQL 语句不就好了」,且不说转化起来不容易,本身 Excel 的数据就不够干净,需先做数据处理与清洗。

最后,经过一番搜索,找到了 Python 的这个好用的 SSH Tunnel 包:

解决思路

  • 结合 pandas 写两个函数,一个用于查询数据库,一个用于推送数据到数据库。
  • 结合 sshtunnels 远程用 Python 访问数据库
  • 结合 docopt 等工具写成命令行脚本
  • 结合 pandas 写成数据清理、推送一体化脚本。

具体实现

sshtunnels 顾名思义就是 SSH tunnels to remote server(SSH 连接远程服务器)。

原理

----------------------------------------------------------------------

                            |
-------------+              |    +----------+
    LOCAL    |              |    |  REMOTE  | :22 SSH
    CLIENT   | <== SSH ========> |  SERVER  | :8080 web service
-------------+              |    +----------+
                            |
                         FIREWALL (only port 22 is open)

----------------------------------------------------------------------

大意就是只通过一个 ssh 通道就能访问 8080 端口的服务。

连接数据库

import os
import pandas as pd

from sqlalchemy import create_engine
from sshtunnel import SSHTunnelForwarder


HOSTNAME = '127.0.0.1'  # host
DATABASE = database  # 数据库名,
USERNAME = usename  # 数据库用户名
PASSWORD =  password  # 数据库密码
HOME_PATH = os.getenv('HOME')  # 获取 ~ 目录地址
PKEY_PATH = os.path.join(HOME_PATH, '.ssh', 'id_rsa')


def zkquery(q):
    """The function is support for quering zokoodb.

    Args:
    - q: quering string
    Return:
    - A DataFrame
    """
    with SSHTunnelForwarder(
        (remoteip, port),  # 换成你的远程 ip 和 ssh port
        ssh_username=username,
        ssh_pkey=PKEY_PATH,  # sshkey 地址
        remote_bind_address=('localhost', 3306)
    ) as server:
        engine = create_engine(
            'mysql+mysqlconnector://{}:{}@{}:{}/{}'.format(
                USERNAME, PASSWORD, HOSTNAME,
                server.local_bind_port, DATABASE
                ), echo=False)
        return pd.read_sql_query(q, engine)

通过上方 30 行左右的代码就能读取数据库并返回一个 DataFrame,同理写个函数推送 csv 或 Excel 数据至数据库。

def df2zkdb(df, table):
    with SSHTunnelForwarder(
        (remoteip, port),
        ssh_username=username,
        ssh_pkey=PKEY_PATH,
        remote_bind_address=('localhost', 3306)
    ) as server:
        engine = create_engine(
            'mysql+mysqlconnector://{}:{}@{}:{}/{}'.format(
                USERNAME, PASSWORD, HOSTNAME,
                server.local_bind_port, DATABASE
                ), echo=False)
        df.to_sql(name=table, con=engine, if_exists='append', index=False)
        print("You have successfully pushed {0} records "
              "to the {1} table of zokoodb.".format(df.shape[0], table))
        print("Done!!!")

命令行脚本

# -*- coding: utf-8 -*-
import sys
from zkMySQL import zkquery


def cli():
    try:
        df = zkquery(sys.argv[1])
        return df.to_csv(sys.stdout, index=False)
    except (BrokenPipeError, IOError):
        pass
    sys.stderr.close()


if __name__ == '__main__':
    cli()

效果:

$ zkquery 'select dt_add, id from goods' | head | csvlook
|----------------------+-------|
|  dt_add              | id    |
|----------------------+-------|
|  2016-07-07 16:22:41 | 1948  |
|  2016-07-07 17:27:46 | 1949  |
|  2016-07-26 12:48:39 | 1953  |
|  2016-07-18 15:08:50 | 1950  |
|  2016-07-18 15:37:04 | 1951  |
|  2016-07-25 17:50:24 | 1952  |
|  2016-08-03 14:31:43 | 1954  |
|  2016-08-04 16:11:11 | 1956  |
|  2016-08-08 10:29:39 | 1957  |
|----------------------+-------|

同理写个推送用的,效果:

$ df2zkdb -h
df2zkdb
A script to push DataFrame to zokoodb.

Usage:
    df2zkdb <input> -t <table>

Options:
    -h --help   Show this screen.
    -t --table  to table.

$ df2zkdb goods_temp.csv -t goods_temp
You have pushed 4 records to the goods_temp table of zokoodb.
Done!!!

References