반응형
I. Import 하기
II. Connection 객체 생성
III. Consumer 생성
IV. Sample (http://nathanborror.com/posts/2009/may/20/working-django-and-rabbitmq/)
[ Pool 사용 ]
1. Publisher 가 cast 로 호출할 때 pools.Pool을 사용 (Connection 을 Pool로 연결)
2. Consumer 가 메세지를 받을 때 GreenPool을 사용 (메소드를 Pool로 실행)
3. Consumer 를 Thread로 변경하여 실행을 분기시키고자 할 때 eventlet.spawn을 사용
- 코드를 wait() block 없이 계속 실행
- 해당 Thread 가 return 값으로 넘어옮
4. Report 와 같이 내부 LoopingCall 을 분기시켜 실행하고자 할 때 eventlet.greenthread 를 사용
[ 용어 정리 ]
- queue : Queue 이름을 의미
- exchange : Queue 와 Bind 하기 위한 exchange 이름을 의미
- exchange_type :
. direct : routing_key 가 정확하게 매칭되어야 함
. topic : routing_key 를 패턴 매칭으로 사용 가능
. fanout : routing_key 가 필요 없이 모두 통신
- routing_key : Key 이름에 해당하는 Queue 에만 메세지를 보낼 수 있음
from carrot import connection as carrot_connection
from carrot import messaging
from carrot import messaging
II. Connection 객체 생성
params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port,
ssl=FLAGS.rabbit_use_ssl,
userid=FLAGS.rabbit_userid,
password=FLAGS.rabbit_password,
virtual_host=FLAGS.rabbit_virtual_host)
connection = carrot_connection.BrokerConnection(params);
port=FLAGS.rabbit_port,
ssl=FLAGS.rabbit_use_ssl,
userid=FLAGS.rabbit_userid,
password=FLAGS.rabbit_password,
virtual_host=FLAGS.rabbit_virtual_host)
connection = carrot_connection.BrokerConnection(params);
III. Consumer 생성
consumer = messaging.Consumer( connection=self,
topic=topic,
proxy=proxy)
topic=topic,
proxy=proxy)
IV. Sample (http://nathanborror.com/posts/2009/may/20/working-django-and-rabbitmq/)
########## Set global variables ###################
AMQP_SERVER = 'localhost'
AMQP_PORT = 5672
AMQP_USER = 'guest'
AMQP_PASSWORD = 'guest'
AMQP_VHOST = '/'
########## Create a consumer ###################
>>> from flopsy import Connection, Consumer
>>> consumer = Consumer(connection=Connection())
>>> consumer.declare(queue='books', exchange='readernaut', routing_key='importer', auto_delete=False)
>>> def message_callback(message):
... print 'Recieved: ' + message.body
... consumer.channel.basic_ack(message.delivery_tag)
>>>
>>> consumer.register(message_callback)
>>> consumer.wait()
########## Create a publisher ###################
>>> from flopsy import Connection, Publisher
>>> publisher = Publisher(connection=Connection(), exchange='readernaut', routing_key='importer')
>>> publisher.publish('Test message!')
>>> publisher.close()
AMQP_SERVER = 'localhost'
AMQP_PORT = 5672
AMQP_USER = 'guest'
AMQP_PASSWORD = 'guest'
AMQP_VHOST = '/'
########## Create a consumer ###################
>>> from flopsy import Connection, Consumer
>>> consumer = Consumer(connection=Connection())
>>> consumer.declare(queue='books', exchange='readernaut', routing_key='importer', auto_delete=False)
>>> def message_callback(message):
... print 'Recieved: ' + message.body
... consumer.channel.basic_ack(message.delivery_tag)
>>>
>>> consumer.register(message_callback)
>>> consumer.wait()
########## Create a publisher ###################
>>> from flopsy import Connection, Publisher
>>> publisher = Publisher(connection=Connection(), exchange='readernaut', routing_key='importer')
>>> publisher.publish('Test message!')
>>> publisher.close()
[ Pool 사용 ]
1. Publisher 가 cast 로 호출할 때 pools.Pool을 사용 (Connection 을 Pool로 연결)
from eventlet import pools
class Pool(pools.Pool):
def create(self):
LOG.debug('Pool creating new connection')
return Connection.instance(new=True)
ConnectionPool = Pool(
max_size=FLAGS.rpc_conn_pool_size,
order_as_stack=True)
with ConnectionPool.item() as conn:
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
class Pool(pools.Pool):
def create(self):
LOG.debug('Pool creating new connection')
return Connection.instance(new=True)
ConnectionPool = Pool(
max_size=FLAGS.rpc_conn_pool_size,
order_as_stack=True)
with ConnectionPool.item() as conn:
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
2. Consumer 가 메세지를 받을 때 GreenPool을 사용 (메소드를 Pool로 실행)
from eventlet import greenpool
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
def _process_data(self, msg_id, ctxt, method, args):
...
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
def _process_data(self, msg_id, ctxt, method, args):
...
3. Consumer 를 Thread로 변경하여 실행을 분기시키고자 할 때 eventlet.spawn을 사용
- 코드를 wait() block 없이 계속 실행
- 해당 Thread 가 return 값으로 넘어옮
import eventlet
self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
4. Report 와 같이 내부 LoopingCall 을 분기시켜 실행하고자 할 때 eventlet.greenthread 를 사용
from eventlet import greenthread
def _inner():
...
greenthread.sleep(interval)
...
greenthread.spawn(_inner)
def _inner():
...
greenthread.sleep(interval)
...
greenthread.spawn(_inner)
[ 용어 정리 ]
- queue : Queue 이름을 의미
- exchange : Queue 와 Bind 하기 위한 exchange 이름을 의미
- exchange_type :
. direct : routing_key 가 정확하게 매칭되어야 함
. topic : routing_key 를 패턴 매칭으로 사용 가능
. fanout : routing_key 가 필요 없이 모두 통신
- routing_key : Key 이름에 해당하는 Queue 에만 메세지를 보낼 수 있음
반응형