2 多套方案來提高 python web 框架的並發處理能力
Python實戰-從菜鳥到大牛的進階之路 作者:極客學院 投票推薦 加入書簽 留言反饋
<h2>python 常見部署方法有 :</h2><ol><li>fcgi :用 spawn-fcgi 或者框架自帶的工具對各個 project 分別生成監聽進程,然後和 http 服務互動</li><li>wsgi :利用 http 服務的 mod_wsgi 模塊來跑各個 project(web 應用程序或框架簡單而通用的 web 服務器 之間的接口)。</li><li>uwsgi 是一款像 php-cgi 一樣監聽同一端口,進行統一管理和負載平衡的工具,uwsgi,既不用 wsgi 協議也不用 fcgi 協議,而是自創了一個 uwsgi 的協議,據說該協議大約是 fcgi 協議的 10 倍那麽快。</li></ol>
其實 wsgi 是分成 server 和 framework (即 application) 兩部分 (當然還有 middleware)。嚴格說 wsgi 隻是一個協議, 規範 server 和 framework 之間連接的接口。
wsgi server 把服務器功能以 wsgi 接口暴露出來。比如 mod_wsgi 是一種 server, 把 apache 的功能以 wsgi 接口的形式提供出來。 <ol><li>wsgi framework 就是我們經常提到的 django 這種框架。不過需要注意的是, 很少有單純的 wsgi framework , 基於 wsgi 的框架往往都自帶 wsgi server。比如 django、cherrypy 都自帶 wsgi server 主要是測試用途, 發布時則使用生產環境的 wsgi server。而有些 wsgi 下的框架比如 pylons、bfg 等, 自己不實現 wsgi server。使用 paste 作為 wsgi server。</li><li>paste 是流行的 wsgi server, 帶有很多中間件。還有 flup 也是一個提供中間件的庫。</li><li>搞清除 wsgi server 和 application, 中間件自然就清楚了。除了 session、cache 之類的應用, 前段時間看到一個 bfg 下的中間件專門用於給網站換膚的 (skin) 。中間件可以想到的用法還很多。</li><li>這裏再補充一下, 像 django 這樣的框架如何以 fastcgi 的方式跑在 apache 上的。這要用到 flup.fcgi 或者 fastcgi.py (eurasia 中也設計了一個 fastcgi.py 的實現) 這些工具, 它們就是把 fastcgi 協議轉換成 wsgi 接口 (把 fastcgi 變成一個 wsgi server) 供框架接入。整個架構是這樣的: django -> fcgi2wsgiserver -> mod_fcgi -> apache 。</li><li>雖然我不是 wsgi 的粉絲, 但是不可否認 wsgi 對 python web 的意義重大。有意自己設計 web 框架, 又不想做 socket 層和 http 報文解析的同學, 可以從 wsgi 開始設計自己的框架。在 python 圈子裏有個共識, 自己隨手搞個 web 框架跟喝口水一樣自然, 非常方便。或許每個 python 玩家都會經曆一個倒騰框架的</li></ol>
uwsgi 的主要特點如下: <ul><li>超快的性能。</li><li>低內存占用(實測為 apache2 的 mod_wsgi 的一半左右)。</li><li>多app管理。</li><li>詳盡的日誌功能(可以用來分析 app 性能和瓶頸)。</li><li>高度可定製(內存大小限製,服務一定次數後重啟等)。</li></ul>
uwsgi 的官方文檔:
http://projects.unbit.it/uwsgi/wiki/doc
nginx.conflocation / { include uwsgi_params uwsgi_pass 127.0.0.1:9090} </pre>
啟動 app
uwsgi -s :9090 -w myapp </pre>
uwsgi 的調優參數~
uwsgi 的參數以上是單個 project 的最簡單化部署,uwsgi 還是有很多令人稱讚的功能的,例如:並發 4 個線程: uwsgi -s :9090 -w myapp -p 4主控製線程 +4 個線程: uwsgi -s :9090 -w myapp -m -p 4執行超過 30 秒的 client 直接放棄: uwsgi -s :9090 -w myapp -m -p 4 -t 30限製內存空間 128m: uwsgi -s :9090 -w myapp -m -p 4 -t 30 --limit-as 128服務超過 10000 個 req 自動 respawn: uwsgi -s :9090 -w myapp -m -p 4 -t 30 --limit-as 128 -r 10000後台運行等: uwsgi -s :9090 -w myapp -m -p 4 -t 30 --limit-as 128 -r 10000 -d uwsgi.log </pre>
為了讓多個站點共享一個 uwsgi 服務,必須把 uwsgi 運行成虛擬站點:去掉“-w myapp”加上”–vhost”:
uwsgi -s :9090 -m -p 4 -t 30 --limit-as 128 -r 10000 -d uwsgi.log --vhost
然後必須配置 virtualenv,virtualenv 是 python 的一個很有用的虛擬環境工具,這樣安裝:
最後配置 nginx,注意每個站點必須單獨占用一個 server,同一 server 不同 location 定向到不同的應用不知為何總是失敗,估計也 算是一個 bug。
server { listen 80; server_name app1.mydomain; location / { include uwsgi_params; uwsgi_pass 127.0.0.1:9090; uwsgi_param uwsgi_pyhome /var//myenv; uwsgi_param uwsgi_script myapp1; uwsgi_param uwsgi_chdir /var//myappdir1; }}server { listen 80; server_name app2.mydomain; location / { include uwsgi_params; uwsgi_pass 127.0.0.1:9090; uwsgi_param uwsgi_pyhome /var//myenv; uwsgi_param uwsgi_script myapp2; uwsgi_param uwsgi_chdir /var//myappdir2; }} </pre>
這樣,重啟 nginx 服務,兩個站點就可以共用一個 uwsgi 服務了。
再來搞下 fastcgi 的方式
location / {fastcgi_param request_method $request_method;fastcgi_param query_string $query_string;fastcgi_param content_type $content_type;fastcgi_param content_length $content_length;fastcgi_param gateway_interface cgi/1.1;fastcgi_param server_software nginx/$nginx_version;fastcgi_param remote_addr $remote_addr;fastcgi_param remote_port $remote_port;fastcgi_param server_addr $server_addr;fastcgi_param server_port $server_port;fastcgi_param server_name $server_name;fastcgi_param server_protocol $server_protocol;fastcgi_param script_filename $fastcgi_script_name;fastcgi_param path_info $fastcgi_script_name;fastcgi_pass 127.0.0.1:9002;} </pre>
location /static/ {root /path/to/;if (-f $request_filename) { rewrite ^/static/(.*)$ /static/$1 break;} } </pre>
啟動一個 fastcgi 的進程
spawn-fcgi -d /path/to/ -f /path/to//index.py -a 127.0.0.1 -p 9002 </pre>
用 web.py 寫的一個小 demo 測試
#!/usr/bin/env python # -*- coding: utf-8 -*-import weburls = ("/.*", "hello")app = web.application(urls, globals)ss hello: def get(self):return \''hello, world!\''if __name__ == "__main__": web.wsgi.runwsgi =mbda func, addr=none: web.wsgi.runfcgi(func, addr) app.run </pre>
啟動 nginx
nginx </pre>
這樣就 ok 了~ <h2>下麵開始介紹下 我一般用的方法:</h2>
圖片 2.1 pic
前端 nginx 用負責負載分發:
部署的時候采用了單 ip 多端口方式,服務器有 4 個核心,決定開 4 個端口對應,分別是 8885~8888,修改
upstream backend {server 127.0.0.1:8888;server 127.0.0.1:8887;server 127.0.0.1:8886;server 127.0.0.1:8885;} server{listen 80;server_name message.test;keepalive_timeout 65; #proxy_read_timeout 2000; #sendfile on;tcp_nopush on;tcp_nody on; location / {proxy_pass_header server;proxy_set_header host $http_host;proxy_redirect off;proxy_set_header x-real-ip $remote_addr;proxy_set_header x-scheme $scheme;proxy_pass http://backend;}} </pre>
然後運行四個 python 程序,端口為咱們配置好的端口
我這裏用 tornado 寫了一個執行係統程序的例子:
import subprocessimport tornado.ioloopimport timeimport tlimport functoolsimport osss genericsubprocess (object): def __init__ ( self, timeout=-1, **popen_args ):self.args = dictself.args["stdout"] = subprocess.pipeself.args["stderr"] = subprocess.pipeself.args["close_fds"] = trueself.args.update(popen_args)self.ioloop = noneself.expiration = noneself.pipe = noneself.timeout = timeoutself.streams = self.has_timed_out = false def start(self):"""spawn the task.throws runtimeerror if the task was already started."""if not self.pipe is none: raise runtimeerror("cannot start task twice")self.ioloop = tornado.ioloop.ioloop.instanceif self.timeout > 0: self.expiration = self.ioloop.add_timeout( time.time + self.timeout, self.on_timeout )self.pipe = subprocess.popen(**self.args)self.streams = [ (self.pipe.stdout.fileno, ), (self.pipe.stderr.fileno, ) ]for fd, d in self.streams: gs = tl.tl(fd, tl.f_getfl)| os.o_ndy tl.tl( fd, tl.f_setfl, gs) self.ioloop.add_handler( fd, self.stat, self.ioloop.read|self.ioloop.error) def on_timeout(self):self.has_timed_out = trueself.cancel def cancel (self ) :"""cancel task executionsends sigkill to the child process."""try: self.pipe.killexcept: pass def stat( self, *args ):\''\''\''check processpletion and consume pending i/o data\''\''\''self.pipe.pollif not self.pipe.returncode is none: \''\''\''cleanup handlers and timeouts\''\''\'' if not self.expiration is none:self.ioloop.remove_timeout(self.expiration) for fd, dest in self.streams:self.ioloop.remove_handler(fd) \''\''\''schedulle callback (first try to read all pending data)\''\''\'' self.ioloop.add_callback(self.on_finish)for fd, dest in self.streams: while true:try: data = os.read(fd, 4096) if len(data) == 0:break dest.extend([data])except: break @property def stdout(self):return self.get_output(0) @property def stderr(self):return self.get_output(1) @property def status(self):return self.pipe.returncode def get_output(self, index ):return "".join(self.streams[index][1]) def on_finish(self):raise notimplementedss subprocess (genericsubprocess): """create new instance arguments:callback: method to be called afterpletion. this method should take 3 arguments: statuscode(int), stdout(str), stderr(str), has_timed_out(boolean)timeout: wall time allocated for the process toplete. after this expires task.cancel is called. a negative timeout value means no limit is set the task is not started until start is called. the process will then be spawned using subprocess.popen(**popen_args). the stdout and stderr are always set to subprocess.pipe. """ def __init__ ( self, callback, *args, **kwargs):"""create new instancearguments: callback: method to be called afterpletion. this method should take 3 arguments: statuscode(int), stdout(str), stderr(str), has_timed_out(boolean) timeout: wall time allocated for the process toplete. after this expires task.cancel is called. a negative timeout value means no limit is setthe task is not started until start is called. the process will then be spawned using subprocess.popen(**popen_args). the stdout and stderr are always set to subprocess.pipe."""self.callback = callbackself.done_callback = falsegenericsubprocess.__init__(self, *args, **kwargs) def on_finish(self):if not self.done_callback: self.done_callback = true \''\''\''prevent calling callback twice\''\''\'' self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))if __name__ == "__main__": ioloop = tornado.ioloop.ioloop.instance def print_timeout( status, stdout, stderr, has_timed_out) :assert(status!=0)assert(has_timed_out)print "ok status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out) def print_ok( status, stdout, stderr, has_timed_out) :assert(status==0)assert(not has_timed_out)print "ok status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out) def print_error( status, stdout, stderr, has_timed_out):assert(status!=0)assert(not has_timed_out)print "ok status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out) def stop_test:ioloop.stop t1 = subprocess( print_timeout, timeout=3, args=[ "sleep", "5" ] ) t2 = subprocess( print_ok, timeout=3, args=[ "sleep", "1" ] ) t3 = subprocess( print_ok, timeout=3, args=[ "sleepdsdasdas", "1" ] ) t4 = subprocess( print_error, timeout=3, args=[ "cat", "/etc/sdfsdfsdfsdfsdfsdfsdf" ] ) t1.start t2.start try:t3.startassert(false) except:print "ok" t4.start ioloop.add_timeout(time.time + 10, stop_test) ioloop.start </pre>
大家可以先用 uwsgi,要還是有壓力和堵塞的話,可以用用 nginx 做負載。
我自己的經驗來看還是這個靠譜~
其實 wsgi 是分成 server 和 framework (即 application) 兩部分 (當然還有 middleware)。嚴格說 wsgi 隻是一個協議, 規範 server 和 framework 之間連接的接口。
wsgi server 把服務器功能以 wsgi 接口暴露出來。比如 mod_wsgi 是一種 server, 把 apache 的功能以 wsgi 接口的形式提供出來。 <ol><li>wsgi framework 就是我們經常提到的 django 這種框架。不過需要注意的是, 很少有單純的 wsgi framework , 基於 wsgi 的框架往往都自帶 wsgi server。比如 django、cherrypy 都自帶 wsgi server 主要是測試用途, 發布時則使用生產環境的 wsgi server。而有些 wsgi 下的框架比如 pylons、bfg 等, 自己不實現 wsgi server。使用 paste 作為 wsgi server。</li><li>paste 是流行的 wsgi server, 帶有很多中間件。還有 flup 也是一個提供中間件的庫。</li><li>搞清除 wsgi server 和 application, 中間件自然就清楚了。除了 session、cache 之類的應用, 前段時間看到一個 bfg 下的中間件專門用於給網站換膚的 (skin) 。中間件可以想到的用法還很多。</li><li>這裏再補充一下, 像 django 這樣的框架如何以 fastcgi 的方式跑在 apache 上的。這要用到 flup.fcgi 或者 fastcgi.py (eurasia 中也設計了一個 fastcgi.py 的實現) 這些工具, 它們就是把 fastcgi 協議轉換成 wsgi 接口 (把 fastcgi 變成一個 wsgi server) 供框架接入。整個架構是這樣的: django -> fcgi2wsgiserver -> mod_fcgi -> apache 。</li><li>雖然我不是 wsgi 的粉絲, 但是不可否認 wsgi 對 python web 的意義重大。有意自己設計 web 框架, 又不想做 socket 層和 http 報文解析的同學, 可以從 wsgi 開始設計自己的框架。在 python 圈子裏有個共識, 自己隨手搞個 web 框架跟喝口水一樣自然, 非常方便。或許每個 python 玩家都會經曆一個倒騰框架的</li></ol>
uwsgi 的主要特點如下: <ul><li>超快的性能。</li><li>低內存占用(實測為 apache2 的 mod_wsgi 的一半左右)。</li><li>多app管理。</li><li>詳盡的日誌功能(可以用來分析 app 性能和瓶頸)。</li><li>高度可定製(內存大小限製,服務一定次數後重啟等)。</li></ul>
uwsgi 的官方文檔:
http://projects.unbit.it/uwsgi/wiki/doc
nginx.conflocation / { include uwsgi_params uwsgi_pass 127.0.0.1:9090} </pre>
啟動 app
uwsgi -s :9090 -w myapp </pre>
uwsgi 的調優參數~
uwsgi 的參數以上是單個 project 的最簡單化部署,uwsgi 還是有很多令人稱讚的功能的,例如:並發 4 個線程: uwsgi -s :9090 -w myapp -p 4主控製線程 +4 個線程: uwsgi -s :9090 -w myapp -m -p 4執行超過 30 秒的 client 直接放棄: uwsgi -s :9090 -w myapp -m -p 4 -t 30限製內存空間 128m: uwsgi -s :9090 -w myapp -m -p 4 -t 30 --limit-as 128服務超過 10000 個 req 自動 respawn: uwsgi -s :9090 -w myapp -m -p 4 -t 30 --limit-as 128 -r 10000後台運行等: uwsgi -s :9090 -w myapp -m -p 4 -t 30 --limit-as 128 -r 10000 -d uwsgi.log </pre>
為了讓多個站點共享一個 uwsgi 服務,必須把 uwsgi 運行成虛擬站點:去掉“-w myapp”加上”–vhost”:
uwsgi -s :9090 -m -p 4 -t 30 --limit-as 128 -r 10000 -d uwsgi.log --vhost
然後必須配置 virtualenv,virtualenv 是 python 的一個很有用的虛擬環境工具,這樣安裝:
最後配置 nginx,注意每個站點必須單獨占用一個 server,同一 server 不同 location 定向到不同的應用不知為何總是失敗,估計也 算是一個 bug。
server { listen 80; server_name app1.mydomain; location / { include uwsgi_params; uwsgi_pass 127.0.0.1:9090; uwsgi_param uwsgi_pyhome /var//myenv; uwsgi_param uwsgi_script myapp1; uwsgi_param uwsgi_chdir /var//myappdir1; }}server { listen 80; server_name app2.mydomain; location / { include uwsgi_params; uwsgi_pass 127.0.0.1:9090; uwsgi_param uwsgi_pyhome /var//myenv; uwsgi_param uwsgi_script myapp2; uwsgi_param uwsgi_chdir /var//myappdir2; }} </pre>
這樣,重啟 nginx 服務,兩個站點就可以共用一個 uwsgi 服務了。
再來搞下 fastcgi 的方式
location / {fastcgi_param request_method $request_method;fastcgi_param query_string $query_string;fastcgi_param content_type $content_type;fastcgi_param content_length $content_length;fastcgi_param gateway_interface cgi/1.1;fastcgi_param server_software nginx/$nginx_version;fastcgi_param remote_addr $remote_addr;fastcgi_param remote_port $remote_port;fastcgi_param server_addr $server_addr;fastcgi_param server_port $server_port;fastcgi_param server_name $server_name;fastcgi_param server_protocol $server_protocol;fastcgi_param script_filename $fastcgi_script_name;fastcgi_param path_info $fastcgi_script_name;fastcgi_pass 127.0.0.1:9002;} </pre>
location /static/ {root /path/to/;if (-f $request_filename) { rewrite ^/static/(.*)$ /static/$1 break;} } </pre>
啟動一個 fastcgi 的進程
spawn-fcgi -d /path/to/ -f /path/to//index.py -a 127.0.0.1 -p 9002 </pre>
用 web.py 寫的一個小 demo 測試
#!/usr/bin/env python # -*- coding: utf-8 -*-import weburls = ("/.*", "hello")app = web.application(urls, globals)ss hello: def get(self):return \''hello, world!\''if __name__ == "__main__": web.wsgi.runwsgi =mbda func, addr=none: web.wsgi.runfcgi(func, addr) app.run </pre>
啟動 nginx
nginx </pre>
這樣就 ok 了~ <h2>下麵開始介紹下 我一般用的方法:</h2>
圖片 2.1 pic
前端 nginx 用負責負載分發:
部署的時候采用了單 ip 多端口方式,服務器有 4 個核心,決定開 4 個端口對應,分別是 8885~8888,修改
upstream backend {server 127.0.0.1:8888;server 127.0.0.1:8887;server 127.0.0.1:8886;server 127.0.0.1:8885;} server{listen 80;server_name message.test;keepalive_timeout 65; #proxy_read_timeout 2000; #sendfile on;tcp_nopush on;tcp_nody on; location / {proxy_pass_header server;proxy_set_header host $http_host;proxy_redirect off;proxy_set_header x-real-ip $remote_addr;proxy_set_header x-scheme $scheme;proxy_pass http://backend;}} </pre>
然後運行四個 python 程序,端口為咱們配置好的端口
我這裏用 tornado 寫了一個執行係統程序的例子:
import subprocessimport tornado.ioloopimport timeimport tlimport functoolsimport osss genericsubprocess (object): def __init__ ( self, timeout=-1, **popen_args ):self.args = dictself.args["stdout"] = subprocess.pipeself.args["stderr"] = subprocess.pipeself.args["close_fds"] = trueself.args.update(popen_args)self.ioloop = noneself.expiration = noneself.pipe = noneself.timeout = timeoutself.streams = self.has_timed_out = false def start(self):"""spawn the task.throws runtimeerror if the task was already started."""if not self.pipe is none: raise runtimeerror("cannot start task twice")self.ioloop = tornado.ioloop.ioloop.instanceif self.timeout > 0: self.expiration = self.ioloop.add_timeout( time.time + self.timeout, self.on_timeout )self.pipe = subprocess.popen(**self.args)self.streams = [ (self.pipe.stdout.fileno, ), (self.pipe.stderr.fileno, ) ]for fd, d in self.streams: gs = tl.tl(fd, tl.f_getfl)| os.o_ndy tl.tl( fd, tl.f_setfl, gs) self.ioloop.add_handler( fd, self.stat, self.ioloop.read|self.ioloop.error) def on_timeout(self):self.has_timed_out = trueself.cancel def cancel (self ) :"""cancel task executionsends sigkill to the child process."""try: self.pipe.killexcept: pass def stat( self, *args ):\''\''\''check processpletion and consume pending i/o data\''\''\''self.pipe.pollif not self.pipe.returncode is none: \''\''\''cleanup handlers and timeouts\''\''\'' if not self.expiration is none:self.ioloop.remove_timeout(self.expiration) for fd, dest in self.streams:self.ioloop.remove_handler(fd) \''\''\''schedulle callback (first try to read all pending data)\''\''\'' self.ioloop.add_callback(self.on_finish)for fd, dest in self.streams: while true:try: data = os.read(fd, 4096) if len(data) == 0:break dest.extend([data])except: break @property def stdout(self):return self.get_output(0) @property def stderr(self):return self.get_output(1) @property def status(self):return self.pipe.returncode def get_output(self, index ):return "".join(self.streams[index][1]) def on_finish(self):raise notimplementedss subprocess (genericsubprocess): """create new instance arguments:callback: method to be called afterpletion. this method should take 3 arguments: statuscode(int), stdout(str), stderr(str), has_timed_out(boolean)timeout: wall time allocated for the process toplete. after this expires task.cancel is called. a negative timeout value means no limit is set the task is not started until start is called. the process will then be spawned using subprocess.popen(**popen_args). the stdout and stderr are always set to subprocess.pipe. """ def __init__ ( self, callback, *args, **kwargs):"""create new instancearguments: callback: method to be called afterpletion. this method should take 3 arguments: statuscode(int), stdout(str), stderr(str), has_timed_out(boolean) timeout: wall time allocated for the process toplete. after this expires task.cancel is called. a negative timeout value means no limit is setthe task is not started until start is called. the process will then be spawned using subprocess.popen(**popen_args). the stdout and stderr are always set to subprocess.pipe."""self.callback = callbackself.done_callback = falsegenericsubprocess.__init__(self, *args, **kwargs) def on_finish(self):if not self.done_callback: self.done_callback = true \''\''\''prevent calling callback twice\''\''\'' self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))if __name__ == "__main__": ioloop = tornado.ioloop.ioloop.instance def print_timeout( status, stdout, stderr, has_timed_out) :assert(status!=0)assert(has_timed_out)print "ok status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out) def print_ok( status, stdout, stderr, has_timed_out) :assert(status==0)assert(not has_timed_out)print "ok status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out) def print_error( status, stdout, stderr, has_timed_out):assert(status!=0)assert(not has_timed_out)print "ok status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out) def stop_test:ioloop.stop t1 = subprocess( print_timeout, timeout=3, args=[ "sleep", "5" ] ) t2 = subprocess( print_ok, timeout=3, args=[ "sleep", "1" ] ) t3 = subprocess( print_ok, timeout=3, args=[ "sleepdsdasdas", "1" ] ) t4 = subprocess( print_error, timeout=3, args=[ "cat", "/etc/sdfsdfsdfsdfsdfsdfsdf" ] ) t1.start t2.start try:t3.startassert(false) except:print "ok" t4.start ioloop.add_timeout(time.time + 10, stop_test) ioloop.start </pre>
大家可以先用 uwsgi,要還是有壓力和堵塞的話,可以用用 nginx 做負載。
我自己的經驗來看還是這個靠譜~