python 开发的三种运行模式详细介绍

Python 三种运行模式

Python作为一门脚本语言,使用的范围很广。有的同学用来算法开发,有的用来验证逻辑,还有的作为胶水语言,用它来粘合整个系统的流程。不管怎么说,怎么使用python既取决于你自己的业务场景,也取决于你自己的python应用能力。就我个人而言,我觉得python作为既可以用来进行业务的开发,也可以进行产品原型的开发.一般来说,python的运行主要下面这三种模式。

1.单循环模式

单循环模式使用的最多,也最简单,当然也最稳定。为什么呢,因为单循环本来代码就写的很少,出错的机会就更少,所以一般只要写对了接口,犯错误的机会还是很低的。当然,我们不是说单循环就没什么用,恰恰相反。单循环模式是我们最经常使用的一种模式。这种开发对于一些小工具、小应用、小场景特别合适。

#!/usr/bin/python
import os
import sys
import re
import signal
import time

g_exit = 0

def sig_process(sig, frame):
  global g_exit
  g_exit = 1
  print 'catch signal'

def main():
  global g_exit
  signal.signal(signal.SIGINT, sig_process)
  while 0 == g_exit:
    time.sleep(1)

    '''
    module process code
    ''' 

if __name__ == '__main__':
  main()

2.多线程模式

多线程模式经常用在那些容易阻塞的场合。比如多线程客户端读写,多线程web访问等等。这里的多线程有个特点,那就是每个线程都是按照客户端创建的。简单的举例就是服务器socket,来一个socket创建一个thread,这样如果存在多个用户的话,就有多个thread并发连接。这种方式比较简单,用起来很快,缺点就是所有业务有可能并发执行,全局数据保护起来很麻烦。

#!/usr/bin/python
import os
import sys
import re
import signal
import time
import threading

g_exit=0

def run_thread():
  global g_exit
  while 0 == g_exit:
    time.sleep(1)

    '''
    do jobs per thread
    '''

def sig_process(sig, frame):
  global g_exit
  g_exit = 1

def main():

  global g_exit

  signal.signal(signal.SIGINT, sig_process)
  g_threads = []
  for i in range(4):
    td = threading.Thread(target = run_thread)
    td.start()
    g_threads.append(td)

  while 0 == g_exit:
    time.sleep(1)

  for i in range(4):
    g_threads[i].join()

if __name__ == '__main__':
  main()

3.reactor模式

reactor模式,不复杂,简单的来说,就是利用多线程来处理每一个业务。如果一个业务已经被某一个thread处理了,那么其他的thread就不能再次处理这个业务了。这样,它相当于解决了一个问题,也就是我们在前面所说的锁的问题。因此,对于这种模式的开发者来说,编写业务其实是一件简单的事情,因为他所要关注的只是自己的一亩三分地就可以了。之前云风同学编写的skynet就是这么一种模式,只不过它使用了c+lua来开发的。其实只要了解了reactor模式本身,用什么语言开发不重要,关键是理解reactor的精髓就可以了。

如果写成code,那应该是这样的,

#!/usr/bin/python

import os
import sys
import re
import time
import signal
import threading

g_num = 4
g_exit =0
g_threads = []
g_sem = []
g_lock = threading.Lock()
g_event = {}

def add_event(name, data):
  global g_lock
  global g_event

  if '' == name:
    return

  g_lock.acquire()
  if name in g_event:
    g_event[name].append(data)
    g_lock.release()
    return

  g_event[name] = []

  '''
  0 means idle, 1 means busy
  '''
  g_event[name].append(0)
  g_event[name].append(data)
  g_lock.release()

def get_event(name):
  global g_lock
  global g_event

  g_lock.acquire()
  if '' != name:
    if [] != g_event[name]:
      if 1 != len(g_event[name]):
        data = g_event[name][1]
        del g_event[name][1]
        g_lock.release()
        return name, data
      else:
        g_event[name][0] = 0

  for k in g_event:
    if 1 == len(g_event[k]):
      continue

    if 1 == g_event[k][0]:
      continue

    g_event[k][0] =1
    data = g_event[k][1]
    del g_event[k][1]
    g_lock.release()
    return k, data

  g_lock.release()
  return '', -1

def sig_process(sig, frame):
  global g_exit
  g_exit =1
  print 'catch signal'

def run_thread(num):
  global g_exit
  global g_sem
  global g_lock

  name = ''
  data = -1

  while 0 == g_exit:
    g_sem[num].acquire()

    while True:
      name, data = get_event(name)
      if '' == name:
        break

      g_lock.acquire()
      print name, data
      g_lock.release()

def test_thread():
  global g_exit

  while 0 == g_exit:
    for i in range(100):
      add_event('1', (i << 2) + 0)
      add_event('2', (i << 2) + 1)
      add_event('3', (i << 2) + 2)
      add_event('4', (i << 2) + 3)

    time.sleep(1)

def main():
  global g_exit
  global g_num
  global g_threads
  global g_sem

  signal.signal(signal.SIGINT, sig_process)
  for i in range(g_num):
    sem = threading.Semaphore(0)
    g_sem.append(sem)
    td = threading.Thread(target=run_thread, args=(i,))
    td.start()
    g_threads.append(td)

  '''
  test thread to give data
  '''
  test = threading.Thread(target=test_thread)
  test.start()

  while 0 == g_exit:
    for i in range(g_num):
      g_sem[i].release()
    time.sleep(1)

  '''
  call all thread to close
  '''
  for i in range(g_num):
    g_sem[i].release()

  for i in range(g_num):
    g_threads[i].join()

  test.join()
  print 'exit now'

'''
entry
'''
if __name__ == '__main__':
  main()

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

(0)

相关推荐

  • 使用nodejs、Python写的一个简易HTTP静态文件服务器

    日常开发过程中,我们经常需要修改一些放在 CDN 上的静态文件(如 JavaScript.CSS.HTML 文件等),这个过程中,我们希望能有一种方式将线上 CDN 的目录映射为本地硬盘上的某个目录,这样,当我们在本地修改了某个文件时,不需要发布,刷新后马上能看到效果. 比如,我们的 CDN 域名是:http://a.mycdn.com,本地对应的目录是:D:\workassets,我们希望所有对 http://a.mycdn.com/* 的访问被映射到本地的 D:\workassets\* 下

  • Python使用中文正则表达式匹配指定中文字符串的方法示例

    本文实例讲述了Python使用中文正则表达式匹配指定中文字符串的方法.分享给大家供大家参考,具体如下: 业务场景: 从中文字句中匹配出指定的中文子字符串 .这样的情况我在工作中遇到非常多, 特梳理总结如下. 难点: 处理GBK和utf8之类的字符编码, 同时正则匹配Pattern中包含汉字,要汉字正常发挥作用,必须非常谨慎.推荐最好统一为utf8编码,如果不是这种最优情况,也有酌情处理. 往往一个具有普适性的正则表达式会简化程序和代码的处理,使过程简洁和事半功倍,这往往是高手和菜鸟最显著的差别.

  • Python的Bottle框架中返回静态文件和JSON对象的方法

    代码如下: # -*- coding: utf-8 -*- #!/usr/bin/python # filename: todo.py # codedtime: 2014-8-28 20:50:44 import sqlite3 import bottle @bottle.route('/help3') def help(): return bottle.static_file('help.html', root='.') #静态文件 @bottle.route('/json:json#[0-9

  • Python中二维列表如何获取子区域元素的组成

    用过NumPY的应该都知道,在二维数组中可以方便地使用区域切片功能,如下图: 而这个功能在Python标准库的List中是不支持的,在List中只能以一维方式来进行切片操作: 但有时候我只想用一下这个功能,但又不想引入NumPY.其实这时候我也是可以在Python中实现的.这时候,只需在一个类中实现__getitem__特殊方法: class Array: """实现__getitem__,支持序列获取元素.Slice等特性""" def __i

  • Python的Flask框架及Nginx实现静态文件访问限制功能

    Nginx配置 Ngnix,一个高性能的web服务器,毫无疑问它是当下的宠儿.卓越的性能,灵活可扩展,在服务器领域里攻城拔寨,征战天下. 静态文件对于大多数website是不可或缺的一部分.使用Nginx来处理静态文件也是常见的方式.然而,一些静态文件,我们并不像任何情况下都公开给任何用户.例如一些提供给用户下载的文件,一些用户上传的涉及用户隐私的图片等.我们我希望用户登录的情况下可以访问,未登录的用户则不可见. 粗略的处理,在后端程序可以做过滤,渲染页面的时候,在视图逻辑里面验证用户登录,然后

  • python解决汉字编码问题:Unicode Decode Error

    前言 最近由于项目需要,需要读取一个含有中文的txt文档,完了还要保存文件.文档之前是由base64编码,导致所有汉字读取显示乱码.项目组把base64废弃之后,先后出现两个错误: ascii codec can't encode characters in position ordinal not in range 128 UnicodeDecodeError: 'utf8' codec can't decode byte 0x. 如果对于ascii.unicode和utf-8还不了解的小伙伴

  • python中实现迭代器(iterator)的方法示例

    概述 迭代器是访问集合元素的一种方式.迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束.迭代器只能往前不会后退. 延迟计算或惰性求值 (Lazy evaluation) 迭代器不要求你事先准备好整个迭代过程中所有的元素.仅仅是在迭代至某个元素时才计算该元素,而在这之前或之后,元素可以不存在或者被销毁.这个特点使得它特别适合用于遍历一些巨大的或是无限的集合. 今天创建了一个实体类,大致如下: class Account(): def __init__(self, account_n

  • Python 3.x 连接数据库示例(pymysql 方式)

    由于 MySQLdb 模块还不支持 Python3.x,所以 Python3.x 如果想连接MySQL需要安装 pymysql 模块. pymysql 模块可以通过 pip 安装.但如果你使用的是 pycharm IDE,则可以使用 project python 安装第三方模块. [File] >> [settings] >> [Project: python] >> [Project Interpreter] >> [Install按钮] 由于Python

  • 一步步教你用Python实现2048小游戏

    前言 2048游戏规则:简单的移动方向键让数字叠加,并且获得这些数字每次叠加后的得分,当出现2048这个数字时游戏胜利.同时每次移动方向键时,都会在这个4*4的方格矩阵的空白区域随机产生一个数字2或者4,如果方格被数字填满了,那么就GameOver了. 主逻辑图 逻辑图解:黑色是逻辑层,蓝色是外部方法,红色是类内方法,稍后即可知道~ 下面容我逐行解释主逻辑main()函数,并且在其中穿叉外部定义的函数与类. 主逻辑代码解读(完整代码见文末) 主逻辑main如下,之后的是对主函数中的一些方法的解读

  • python django 访问静态文件出现404或500错误

    django static文件夹下面的内容方法不了 出现404 500错误 需要查看自己的settings文件确保有一下内容 import os PROJECT_ROOT = os.path.dirname(__file__) DEBUG = True STATIC_URL = '/static/' STATICFILES_DIRS = ( os.path.join(PROJECT_ROOT, 'static'), ) STATICFILES_FINDERS = ( 'django.contri

随机推荐