zmq 异步消息队列
原文链接 https://liutaihua.github.io/2013/03/07/ZMQ-asynchronous-message-queue.html
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。
zmq push--pull 方式
在ZMQ中是淡化服务端和客户端的概念的:
- 相对的服务端:
- 创建一个SUBer订阅者bind一个端口, 用来接收数据
- 创建一个zmq.PUSH
- 创建一个zmq poller轮询对象,
- 将sub注册到poller, 并赋予zmq.POLLIN意味轮询进来的msg
- 创建sock=poller.poll()开始轮询
- 当有msg发送到suber订阅者的监听端口后, sock.recv()方法将会收到msg,
- 最后使用之前创建的pusher, 使用pusher.send(msg)将消息推送到连接到的puller, 如果无puller, 此msg将被丢弃
相对的client:
- 创建zmq.PULL 连接到服务端接收push过来的消息
消息创建者:
- 创建一个zmq.PUB对象, 意味着此对象为一个消息发布者: pub = context.socket(zmq.PUB)
- 连接到服务端的suber的监听端口: pub.connect('tcp://%s:%s' % (sub_host, sub_port))
- 最后将需要发送的msg, 使用pub.send(msg)发送给suber订阅者
代码示例:
对于服务端: import zmq context = zmq.Context()
"""定义一个订阅者, 注意的是,这里的订阅者是从服务端来看, 这个method是订阅者(从这个角度来说服务端也能看成是客户端了), 对应的client创建一个发布者(PUB)时, 使用connect连接的就是此服务端的订阅者.""" def create_subscriber(port): sub = context.socket(zmq.SUB) sub.bind('tcp://*:%s' % port) sub.setsockopt(zmq.SUBSCRIBE, '') return sub
"""此模式在服务端暂时没用用到""" def create_publisher(port): pub = context.socket(zmq.PUB) pub.bind('tcp://*:%s' % port) pub.setsockopt(zmq.HWM, 0) return pub
"""定义个推送者, 如果有client连上此pusher, 当有新消息时, client的pull.recv()将会获得msg""" def create_pusher(port): pusher = context.socket(zmq.PUSH) pusher.bind('tcp://*:%s' % port) return pusher
def main(): """初始化函数方法""" sub = create_subscriber(args.sub_port) pub = create_publisher(args.pub_port) pusher = create_pusher(args.push_port)
""" 创建一个Poller初始化, 将sub(订阅者)注册到此Poller, 并使用POLLIN参数, 在后面的poller.pull()方法中, 将能pull到最新的,从client程序发到sub来的消息, 最后使用pub和pusher将消息send出去""" poller = zmq.Poller() poller.register(sub, zmq.POLLIN)
while True:
socks = poller.poll() \# 创建socks
for k, v in socks:
""" 获取消息,此消息实际是由client程序的pub发送到此server的sub,
然后经由poller.register, 被poller.poll()实例经由recv方法获取"""
message = k.recv()
pub.send(message)
# FIXME: Use gevent instead.
try:
\# 使用pusher将msg推送给client程序的puller.recv,
pusher.send(message.split(' ', 1)[-1], zmq.NOBLOCK)
except:
pass
客户端: import zmq context = zmq.Context()
"""创建一个发布者, 发布者连接到server程序的订阅者, 产生msg后send, 其他语言比如cpp, 也是一样连 接的是上面server程序的sub, 发送mq""" def pub(): pub = context.socket(zmq.PUB) pub.connect('tcp://%s:%s' % (sub_host, sub_port)) while True: msg = 'abc hello' + str(time.time()) pub.send(msg) print 'sending', msg time.sleep(1)
# 暂时没有用到, 此仅作示例 def sub(): sub = context.socket(zmq.SUB) sub.setsockopt(zmq.SUBSCRIBE, '') sub.connect('tcp://%s:%s' % (pub_host, pub_port))
while True:
msg = sub.recv()
print 'Got:', msg
"""创建一个puller, 连接的是server程序中的pusher, server端pusher有新msg时, 会push到此puller""" def pull(): pull = context.socket(zmq.PULL) pull.connect('tcp://%s:%s' % (pusher_host, pusher_port))
"""一个死循环, 不断pull新msg, 接收到msg后根据msg再进行相关业务逻辑,
一般msg采用json格式, 能非常方便处理不能语言程序之间, 不同进程之间的通信."""
while True:
msg = pull.recv()
print 'Got: ', msg