zeromq : 무한 대기를 방지하는 방법?
방금 ZMQ를 시작했습니다. 워크 플로가 다음과 같은 앱을 디자인하고 있습니다.
- 여러 클라이언트 중 하나 (임의의 PULL 주소가 있음) 5555에서 서버에 요청을 PUSH합니다.
- 서버는 클라이언트 PUSH를 영원히 기다리고 있습니다. 하나가 오면 해당 특정 요청에 대해 작업자 프로세스가 생성됩니다. 예, 작업자 프로세스는 동시에 존재할 수 있습니다.
- 해당 프로세스가 작업을 완료하면 결과를 클라이언트에 푸시합니다.
PUSH / PULL 아키텍처가 이에 적합하다고 가정합니다. 제발 나 수정 이에.
그러나 이러한 시나리오를 어떻게 처리합니까?
- client_receiver.recv ()는 서버가 응답하지 않을 때 무한한 시간을 기다립니다.
- 클라이언트는 요청을 보낼 수 있지만 즉시 실패하므로 작업자 프로세스는 server_sender.send ()에서 영원히 멈춰 있습니다.
그렇다면 PUSH / PULL 모델에서 타임 아웃 과 같은 것을 어떻게 설정 합니까?
편집 : user938949의 제안에 감사드립니다, 나는 작동하는 대답을 얻었고 후손을 위해 그것을 공유하고 있습니다.
zeromq> = 3.0을 사용하는 경우 RCVTIMEO 소켓 옵션을 설정할 수 있습니다.
client_receiver.RCVTIMEO = 1000 # in milliseconds
그러나 일반적으로 폴러를 사용할 수 있습니다.
poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send
그리고 poller.poll()
타임 아웃을합니다 :
evts = poller.poll(1000) # wait *up to* one second for a message to arrive.
evts
수신 할 항목이 없으면 빈 목록이됩니다.
로 폴링 zmq.POLLOUT
하여 전송이 성공하는지 확인할 수 있습니다 .
또는 실패했을 수있는 피어의 경우를 처리하려면 다음을 수행하십시오.
worker.send(msg, zmq.NOBLOCK)
이는 항상 즉시 반환됩니다. 전송이 완료되지 않으면 ZMQError (zmq.EAGAIN)가 발생합니다.
이것은 user938949의 답변과 http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/를 참조한 후 만든 빠른 해킹 이었습니다 . 더 잘하면 답변을 게시 해 주시면 답변 을 추천하겠습니다 .
안정성에 대한 지속적인 솔루션 을 원하는 사람들 은 http://zguide.zeromq.org/page:all#toc64를 참조하십시오.
zeromq 버전 3.0 (베타 ATM)은 ZMQ_RCVTIMEO 및 ZMQ_SNDTIMEO에서 시간 제한 을 지원합니다 . http://api.zeromq.org/3-0:zmq-setsockopt
섬기는 사람
zmq.NOBLOCK은 클라이언트가 존재하지 않을 때 send ()가 차단되지 않도록합니다.
import time
import zmq
context = zmq.Context()
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")
i=0
while True:
i=i+1
time.sleep(0.5)
print ">>sending message ",i
try:
ventilator_send.send(repr(i),zmq.NOBLOCK)
print " succeed"
except:
print " failed"
고객
The poller object can listen in on many recieving sockets (see the "Python Multiprocessing with ZeroMQ" linked above. I linked it only on work_receiver. In the infinite loop, the client polls with an interval of 1000ms. The socks object returns empty if no message has been recieved in that time.
import time
import zmq
context = zmq.Context()
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)
# Loop and accept messages from both channels, acting accordingly
while True:
socks = dict(poller.poll(1000))
if socks:
if socks.get(work_receiver) == zmq.POLLIN:
print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
print "error: message timeout"
The send wont block if you use ZMQ_NOBLOCK, but if you try closing the socket and context, this step would block the program from exiting..
The reason is that the socket waits for any peer so that the outgoing messages are ensured to get queued.. To close the socket immediately and flush the outgoing messages from the buffer, use ZMQ_LINGER and set it to 0..
If you're only waiting for one socket, rather than create a Poller
, you can do this:
if work_receiver.poll(1000, zmq.POLLIN):
print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
print "error: message timeout"
You can use this if your timeout changes depending on the situation, instead of setting work_receiver.RCVTIMEO
.
참고URL : https://stackoverflow.com/questions/7538988/zeromq-how-to-prevent-infinite-wait
'Program Tip' 카테고리의 다른 글
Android Studio-XML 편집기 자동 완성이 지원 라이브러리에서 작동하지 않음 (0) | 2020.11.22 |
---|---|
MongoDB는 한 번에 여러 컬렉션 쿼리 (0) | 2020.11.21 |
setup.py에서 라이브러리 버전을 어떻게 지정할 수 있습니까? (0) | 2020.11.21 |
SELECT * FROM 여러 테이블. (0) | 2020.11.21 |
Python 방식으로 csv 파일에 헤더 추가 (0) | 2020.11.21 |