Python 编写 CLI 的技巧分享

让 Python 编写的 CLI 支持后台运行、切换 root 、报错自动进入 debug 模式等等好用的功能!

CLI 即 command-line interface,简单理解就是命令行程序啦。我们熟知的 SQLmap、Nmap 等都是 CLI 形式的。与 CLI 对应的一个概念还有 GUI,比如 QQ、微信 客户端,都是 GUI 形式的。由于 CLI 才是本篇的要讲的,所以 GUI 就不再多说了。

开胃小菜

获取当前时间

1
2
def get_now():
return datetime.datetime.now().strftime('%Y-%m-%d %X')

这个代码倒是非常简单与常见,主要是 strftime 里的参数,很多人会用 %Y-%m-%d %H:%M:%S,其实 %H:%M:%S 可以用 %X 替代。

其实还可以更精简,只要你习惯这种格式的日期:

1
2
In [1]: datetime.datetime.now().strftime('%x %X')
Out[1]: '11/11/20 16:13:59'

更多占位符可以参考:

https://docs.python.org/zh-cn/3/library/datetime.html#strftime-and-strptime-format-codes

把 n 秒转为合适的时间单位

1
2
3
4
5
6
7
8
9
10
11
12
def timer_unit(s):
'''将 second 秒转为适合的单位
'''

if s <= 1:
return f'{round(s, 1)}s'

num, unit = [
(i, u) for i, u in ((s / 60**i, u) for i, u in enumerate('smhd')) if i >= 1
][-1]

return f'{round(num, 1)}{unit}'

其实这个之前分享过,可以参考之前的《拾遗录第 1 期》:

1
2
In [1]: timer_unit(1000)
Out[1]: '16.7m'

给你点颜色瞧瞧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from colorama import Fore, Style


def put_color(string, color, bold=False):
'''
give me some color to see :P
'''

if color == 'gray':
COLOR = Style.DIM + Fore.WHITE
else:
COLOR = getattr(Fore, color.upper(), "WHITE")

return f'{Style.BRIGHT if bold else ""}{COLOR}{str(string)}{Style.RESET_ALL}'

这个很简单,没啥好说的。

一份漂亮的 log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import sys


class Logger:
def __init__(self, filename, stdout=False, verb=2):
self.stdout = stdout
self.verbose = verb
self.filename = filename

def _log(self, log_content):
if self.stdout:
sys.stderr.write('\r'+log_content)
else:
with open(self.filename, 'a') as fp:
fp.write(log_content)

def log(self, event, level='INFO', return_str=False):
'''记录日志

@event: 具体事件
@filename: 日志文件
@level: 事件等级
'''

levels = {
'DEBU': 'gray',
'INFO': 'white',
'WARN': 'yellow',
'ERRO': 'red'
}
color = levels.get(level, 'white')

clevel = put_color(level, color, True)
log_time = put_color(get_now(), 'cyan', True)
log_filename = put_color(self.filename.split('/')[-1], 'gray', True)

cevent = put_color(event, color) if color != 'white' else event
log_content = f'[{log_time}] [{clevel}] {cevent}'
if self.verbose == 3:
log_content = f'[{log_time}] [{clevel}] [{log_filename}] {cevent}'

if not event.endswith('\n'): # 保证结尾换行
log_content += '\n'

if self.verbose == 0:
pass
elif self.verbose == 1 and level == 'ERRO':
self._log(log_content)
elif self.verbose == 2 and level != 'DEBU':
self._log(log_content)
elif self.verbose == 3:
self._log(log_content)

if return_str:
return '\r'+log_content.strip()

Python 其实是有 logging 模块的,内置的标准库,专门用于记录 log,不过我记得之前有一个什么需求,用 logging 特别麻烦,所以现在一般还是用自己写的比较简单的 Logger,可以完全自定义。

正餐

以 root 权限重启进程

这几年来,用过 CLI 的程序早已数不清了,我发现对于那些只能使用 root 权限运行的 CLI,在没有加 sudo 运行的时候,有些 CLI 是提示 run as root 之后便退出了;而还有一些会是提示输入 root 密码,然后正常运行,并且这个密码输入的提示也不是作者实现的,用的就是系统终端的提示。

前段时间正好需要写一个以 root 权限运行的 CLI,于是我就去研究了一下,发现 os.exec* 系列都可以实现需求:

1
2
3
4
5
6
7
8
9
10
...
euid = os.geteuid()
if euid != 0:
args = ['sudo', sys.executable] + sys.argv + [os.environ] # type: ignore

if os.system('sudo -n whoami 1>/dev/null 2>/dev/null') != 0:
print('需要 root 权限,请输入 root 密码')

os.execlpe('sudo', *args)
...

本质上就是用新的进程取代拉起它的旧进程,所以 pid 是不会变的(但是要注意加了 sudo 就会拉起一个新的进程)。至于那个 if 嘛,就是终端输入过 root 密码之后,在一定时间内加 root 是不需要输入密码的,这样可以去掉没用的输入密码提示。嗯,非常舒服,这样就算运行的时候忘记输入 sudo 也不需要重新运行了。

转为后台运行

虽然我们可以通过 nohup ... & 来完成,但是不太优雅,因为启动的方式就不一致了,并且 stderr 的log 还需要指定一下,我不太喜欢这样的方式;还有的就是用 screen 或者 tmux,稍有点麻烦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
...
process = subprocess.Popen(
f''' bash <<< '{sys.executable} {" ".join(sys.argv)} >> {Path.runtime_error_logfile} 2>&1 &'$'\\n''echo $!' ''',
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
)

try:
pid = process.stdout.read().decode('utf-8').strip()
if not pid.isdigit():
raise RuntimeError(process.stderr.read().decode('utf-8'))

except KeyboardInterrupt:
print(log(f'User canceled', 'WARN', return_str=True))

except subprocess.TimeoutExpired:
print(log(f'后台运行失败: 命令行有误', 'ERRO', return_str=True))

except Exception as e:
print(log(f'后台运行失败: {e}', 'ERRO', return_str=True))
sys.exit(1)
else:
print(log(f'后台运行中, pid: {pid}', 'INFO', return_str=True))

sys.exit(0)
...

代码非常简单,用 subprocess.Popen 开个新的进程之后,自己退出,这样新的进程就是孤儿进程了,ppid 变成 1,远程的话断开 ssh 也没事。

异常处理 hack

我们在小学 5 年级的时候就知道,Python 的程序在运行时,如果遇到异常,需要使用 try...except 进行捕获;而那些没有被捕获的异常,Python 统一用 sys.excepthook 这个函数来处理,发生异常时,程序中断并输出“详细的”异常信息。

为啥“详细的”要加引号呢?各位写程序 (bug) 遇到报错的时候肯定有想过:“这个变量值到底是什么样的呢?”。而 Python 的报错是不会打印变量值的。所以我就去研究了一下怎么 hack Python 的异常处理。

首先肯定是怎么 hook 异常处理流程。Python 非常友好地提供了一个接口:sys.excepthook,也就是我们只需要提供一个可接受 3 个参数的函数,自己处理 Python 传入的参数即可。

这三个参数分别为:

  1. 触发的异常类:例如 <class 'ZeroDivisionError'>
  2. 触发的异常类的实例:例如 ZeroDivisionError('division by zero')。我们经常写的 except Exception as e 中的 e 和这个就是一样的类型。
  3. 异常的调用栈:例如:<traceback object at 0x10542ee08>,这个栈可以用 .tb_next 来一层层剥开。由于我们一般只需要最开始的报错,所以需要写个循环去拿位于栈最底下的报错。

那么我们很轻易就可以写出以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
def except_hook(_, msg, tb):
while 1:
# 我们只需要最开始的报错
t = tb.tb_next
if t is None:
break

tb = t

frame_info = inspect.getframeinfo(tb)
filename = frame_info.filename.split('/')[-1]
lineno = frame_info.lineno
function = frame_info.function
code = frame_info.code_context[-1].strip()
print(f'[*] 出错文件: {filename}, 行数: {lineno}, 函数: {function}, 代码: {code}')
...

嗯不错,但这还不够,我们需要知道变量的值,而 tb.tb_frame.f_localstb.tb_frame.f_globals 一个是局部变量一个是全局变量,完美符合我们的需求。那么问题又来了,它们保存着报错时所有的变量,里面这么多变量我们不需要全部打印出来,我们只需要打印涉及到报错代码的变量。那么怎么拿到“涉事”变量呢?

一个可行的思路是利用 ast(抽象语法数)解析“涉事”代码,然后只打印代码中出现的变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
print('[+] 涉及变量:')
tree = ast.parse(code, mode='exec')
names = [node for node in ast.walk(tree) if isinstance(node, ast.Name)]
for name in names:
text = name.id
if text in tb.tb_frame.f_locals:
val = str(tb.tb_frame.f_locals.get(text))
val = val if len(val) < 20 else val[:17]+'...'
print(
f' [-] {text}(L): {val}'
)
elif text in tb.tb_frame.f_globals:
val = tb.tb_frame.f_globals.get(text)
print(
f' [-] {text}(G): {val}'
)
...

但是这样有一个弱点,就是如果涉事代码不止一行的话,就很麻烦:

1
2
3
4
5
6
7
8
a = 1
b = 0
print(
(
a
) / (
b)
)

由于 b) 是非法的语句,所以这样会导致 ast.parse 抛出异常。解决办法也很简单,遍历所有在内存的变量看一下在不在 code 里就好了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try:
tree = ast.parse(code, mode='eval')
except Exception:
names = []
for v in tb.tb_frame.f_locals:
if v in code:
names.append(v)
break
else:
for v in tb.tb_frame.f_globals:
if v in code:
names.append(v)
break
else:
names = [node.id for node in ast.walk(tree) if isinstance(node, ast.Name)]

嗯,针不戳!

当然,我们有可能还希望在报错之后,能够启动一个命令行供我们调试使用,这个实现起来也很简单,就是用《拾遗录第 1 期》里提到的小技巧啦:

1
2
3
4
5
__import__('code').interact(
banner=put_color("\nA clean python shell for debug", 'green'),
exitmsg='\n调试结束,希望人没事 \x1b[34m_\x1b[33m┃\x1b[33m┃\x1b[34m_\x1b[39m',
local=dict(tb.tb_frame.f_globals, **tb.tb_frame.f_locals),
)

最后,我们就可以得到这样的结果(详细代码见最后):

不过需要注意的是,前面提到过,只有在开发者未处理的报错,才会被 except_hook 接管,如果你已经用 try...except 处理过了那么 except_hook 是不会触发的

加餐!

综合以上的小技巧,我们可以写出这样的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import os
import sys
import ast
import time
import inspect
import datetime
import requests
import argparse
import linecache
import subprocess
from functools import partial

from colorama import Fore, Style


class Path:
runtime_error_logfile = 'runtime-error.log'
main_logfile = 'main.log'


class Logger:
def __init__(self, filename, stdout=False, verb=2):
self.stdout = stdout
self.verbose = verb
self.filename = filename

def _log(self, log_content):
if self.stdout:
sys.stderr.write('\r'+log_content)
else:
with open(self.filename, 'a') as fp:
fp.write(log_content)

def log(self, event, level='INFO', return_str=False):
'''记录日志

@event: 具体事件
@filename: 日志文件
@level: 事件等级
'''

levels = {
'DEBU': 'gray',
'INFO': 'white',
'WARN': 'yellow',
'ERRO': 'red'
}
color = levels.get(level, 'white')

clevel = put_color(level, color, True)
log_time = put_color(get_now(), 'cyan', True)
log_filename = put_color(self.filename.split('/')[-1], 'gray', True)

cevent = put_color(event, color) if color != 'white' else event
log_content = f'[{log_time}] [{clevel}] {cevent}'
if self.verbose == 3:
log_content = f'[{log_time}] [{clevel}] [{log_filename}] {cevent}'

if not event.endswith('\n'): # 保证结尾换行
log_content += '\n'

if self.verbose == 0:
pass
elif self.verbose == 1 and level == 'ERRO':
self._log(log_content)
elif self.verbose == 2 and level != 'DEBU':
self._log(log_content)
elif self.verbose == 3:
self._log(log_content)

if return_str:
return '\r'+log_content.strip()


def get_now():
return datetime.datetime.now().strftime('%Y-%m-%d %X')


def timer_unit(s):
'''将 second 秒转为适合的单位
'''

if s <= 1:
return f'{round(s, 1)}s'

num, unit = [
(i, u) for i, u in ((s / 60**i, u) for i, u in enumerate('smhd')) if i >= 1
][-1]

return f'{round(num, 1)}{unit}'


def put_color(string, color, bold=False):
'''
give me some color to see :P
'''

if color == 'gray':
COLOR = Style.DIM + Fore.WHITE
else:
COLOR = getattr(Fore, color.upper(), "WHITE")

return f'{Style.BRIGHT if bold else ""}{COLOR}{str(string)}{Style.RESET_ALL}'


def except_hook(_, msg, tb):
while 1:
t = tb.tb_next
if t is None:
break

tb = t

frame_info = inspect.getframeinfo(tb)
filename = frame_info.filename.split('/')[-1]
lineno = frame_info.lineno
function = frame_info.function
if frame_info.code_context is None:
# 如果已经在 python shell 里
# 则直接打印报错
print(f'[!] Python shell raised an exception: {put_color(msg, "red")}')
return

code = frame_info.code_context[-1].strip()
try:
tree = ast.parse(code, mode='eval')
except Exception:
names = []
for v in tb.tb_frame.f_locals:
if v in code:
names.append(v)
break
else:
for v in tb.tb_frame.f_globals:
if v in code:
names.append(v)
break
else:
names = [node.id for node in ast.walk(tree) if isinstance(node, ast.Name)]

values = []
for name in names:
if name in tb.tb_frame.f_locals:
val = str(tb.tb_frame.f_locals.get(name))
val = val if len(val) < 20 else val[:17]+'...'
values.append(
f'{put_color(name, "yellow")}(L): {put_color(val, "cyan")}'
)
elif name in tb.tb_frame.f_globals:
val = tb.tb_frame.f_globals.get(name)
values.append(
f'{put_color(name, "yellow")}(G): {put_color(val, "cyan")}'
)

var_tip = ''
if values:
var = ' | '.join(values)
var_tip = f', relevant values: {var}'

log(
'something went wrong, '
f'{put_color(msg, "magenta")} in '
f'{put_color(filename, "gray")}, line {put_color(lineno, "cyan")}, '
f'code: {put_color(code, "yellow")}{var_tip}',
'ERRO'
)

if not backflag:
if values:
print('\n[*] relevant values:')
for v in values:
print(f'[+] {v}')

__import__('code').interact(
banner=put_color("\nA clean python shell for debug", 'green'),
exitmsg='\n调试结束,希望人没事 \x1b[34m_\x1b[33m┃\x1b[33m┃\x1b[34m_\x1b[39m',
local=dict(tb.tb_frame.f_globals, **tb.tb_frame.f_locals),
)


sys.excepthook = except_hook

VERSION = '1.0'

parser = argparse.ArgumentParser(description=f'Version: {VERSION}; Running in Py3.x')
parser.add_argument("times", type=int, help="run times: 1, 2, 3 ...")
parser.add_argument("-r", "--root", action="store_true", help="run mode: as user or root")
parser.add_argument("--dev", action="store_true", help="run in dev mode")
parser.add_argument("-v", "--verbose", type=int, choices=[0, 1, 2, 3], help="verbose for output")
parser.add_argument("-b", "--background", action="store_true", help="run in background")
parser.add_argument("--stdout", action="store_true", help="verbose for output")
parser.add_argument("--backflag_do_no_not_use_it", action="store_true", help=argparse.SUPPRESS)
args = parser.parse_args()

times = args.times
root = args.root
dev = args.dev
verbose = args.verbose
background = args.background
stdout = args.stdout

# 不要手动在命令行指定这个参数
# 此参数用于后面的后台运行判断
backflag = args.backflag_do_no_not_use_it

if dev:
# 后台运行不能 --stdout,所以只要指定 --dev,--background 就失效
background = False

# --dev 等于 --stdout -v3
stdout = True
verbose = [verbose, 3][verbose is None]

if verbose is None:
verbose = 2

logger = partial(Logger, stdout=stdout, verb=verbose)
log = logger(Path.main_logfile).log

if root:
euid = os.geteuid()
if euid != 0:
args = ['sudo', sys.executable] + sys.argv + [os.environ] # type: ignore

if os.system('sudo -n whoami 1>/dev/null 2>/dev/null') != 0:
print(log('需要 root 权限,请输入 root 密码', 'WARN', return_str=True))

os.execlpe('sudo', *args)

if background:
for i in ['-b', '--b', '--background', '--stdout']:
if i in sys.argv:
sys.argv.remove(i)

sys.argv.append('--backflag')

process = subprocess.Popen(
f''' bash <<< '{sys.executable} {" ".join(sys.argv)} >> {Path.runtime_error_logfile} 2>&1 &'$'\\n''echo $!' ''',
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
)

try:
pid = process.stdout.read().decode('utf-8').strip() # type: ignore
if not pid.isdigit():
raise RuntimeError(process.stderr.read().decode('utf-8')) # type: ignore

except KeyboardInterrupt:
print(log(f'User canceled', 'WARN', return_str=True))

except subprocess.TimeoutExpired:
print(log(f'后台运行失败: 命令行有误', 'ERRO', return_str=True))

except Exception as e:
print(log(f'后台运行失败: {e}', 'ERRO', return_str=True))
sys.exit(1)
else:
print(log(f'后台运行中, pid: {pid}', 'INFO', return_str=True))

sys.exit(0)

log(f'verbose level: {verbose}', 'DEBU')
log(f'run times: {times}', 'DEBU')


class RunEngine:
def __init__(self):
pass

def func(self):
for i in range(times):
# 报错测试
'''
a = 1
b = 0
print(
(
a
) / (
b)
)
'''
try:
requests.get('http://baidu.com', timeout=1).text
except Exception as e:
log(f'第 {i+1} 运行失败: {e}', 'WARN')
else:
log(f'第 {i+1} 次运行成功', 'INFO')

def run(self):
s = time.time()
self.func()
e = time.time()
log(f'耗费时间 {timer_unit(e-s)}')


try:
RunEngine().run()
except KeyboardInterrupt:
log_content = log('stopped by ctrl+c', 'WARN', return_str=True)
if not background and not stdout:
print(log_content)

下面做一些测试与展示:

点击这里可以看动态演示)

help


开发者模式测试


后台运行 & root 模式测试


非后台运行时,出现报错自动开始调试


至此,一个非常方便与完善的 CLI 就诞生了 :P


最近关注量涨了不少
不知道是不是被哪位好心的橘友宣传了一波
在此表示感谢 🍭🍭🍭


Python 编写 CLI 的技巧分享
https://www.tr0y.wang/2020/11/24/Python-CLI-Trick/
作者
Tr0y
发布于
2020年11月24日
更新于
2024年4月19日
许可协议