RabbitMQ Yapısı ve Python ile Uygulama Geliştirme

By | 10 October 2015

Günümüzde asenkron işlemler artık vazgeçilmez oldu.İşleri sıraya sokup, sırası geldiği zaman yapılmasını sağlayabiliriz. Bunu sağlayan ‘AMQP’ adındaki protokoldür. Bu protokolü kullanan yapı da ‘RabbitMQ’. Açık kaynaklı olmakla birlikte, ‘Erlang’ adındaki programlama dili ile yazılmıştır. Yukarıdaki resimde görüldüğü gibi basit bir yapımız var. Adı ‘P’ olan dairemiz, kuyruğa mesaj gönderen programdır. Kırmızı alanlar, kuyruktur. Adı ‘C’ olan daireler ise kuyruğu dinleyen işçilerdir(consumer, worker), burada bir de ‘P'(publisher) ile kuyruk arasındaki ‘exchange’ diye bir yapı var, kısaca bahsetmek gerekir ise mesajın eğer birden fazla kuyruk varsa, nasıl ve ne türde gönderileceğini belirler. Bunları da öğrendikten sonra artık ‘RabbitMQ’ kullanmaya başlayabiliriz. Kurulum için resmi websitesi olan https://www.rabbitmq.com/download.html websitesine girdikten sonra size uygun olan kurulumu tamamlayabilirsiniz. Ben Centos işletim sisteminde ilerleyeceğim. Kurulum yaptıktan sonra alttaki komutu giriyoruz ve sistemi ayağa kaldırıyoruz.

Screen Shot 2015-10-10 at 8.24.49 PM

Ben Python2.7 ile kod yazacağım için RabbitMQ’yu kullanabileceğim modülü yüklemem lazım. Bunun için yine resimdeki komutu çalıştırıyorum. Screen Shot 2015-10-10 at 8.27.18 PM ‘pika’ isimli modülü yükledikten sonra artık kod yazmaya başlayabilirim.

Benim senaryom şu; ekrana 10 tane mesaj yazdırmak istiyorum ama bunu 2 tane worker’ın yapmasını istiyorum. Yani ilk worker ‘Bu 1. mesaj’ derken, ikinci worker ‘Bu 2.mesaj’ demesini ve bunu aynı anda yapmasını sağlayacağım.

import pika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

Burada ilk önce modülümüzü yükledik ve localhost’ta bulunan RabbitMQ servisine bağlandık. Arından ‘channel’ adındaki bir channel objesi oluşturduk.

channel.queue_declare(queue='messages')
for i in range(1, 11):
    message = '%s.mesaj' % (i)
    channel.basic_publish(exchange='', routing_key='messages', body=message)

print 'Mesaj gonderildi.'
connection.close()

Karışık gibi görünsede aslında çok basit bir yapısı var, şimdi tek tek açıklayacağımz hepsini. İlk satırda bir kuyruk belirledik ve adını koyduk. Kuyruğumuzun adı ‘messages’. Daha sonra for ve range sayesinde 10 tane iş yaptırabileceğim bir döngü kuruyorum. ‘channel.basic_publish’ diyerek kuyruğa bir iş atıyorum. Burada mesajın ne türde ne nasıl ileteceğini belirlediğimiz exchange yapısının adını boş bırakıyorum. Eğer boş bırakırsak default exchange devreye giriyor ve bu benim şimdilik işimi görüyor. ‘routing_key=messages’ mesajın hangi kuyruğa gideceğini belirlediğimiz kısım. ‘body=message’ diyerek de ileteceğimiz mesajın içeriğini belirliyoruz. Mesajı gönderdikten sonra “print ‘Mesaj gonderildi'” komutunu vererek ekrana bir bilgi mesajı gösteriyoruz, ardından ‘connection.close()’ niteliği ile bağlantığıyı kapatıyoruz. Bu dosyanın adını ‘publisher.py’ diye kaydediyorum ve çalıştırıyorum, bakalım ne sonuç alacağız?

Screen Shot 2015-10-10 at 8.41.48 PM

Görüldüğü gibi mesajımızı gönderebildik. Ama şimdi de ‘consumer.py’ adındaki dosyamız gerekiyor. Bu mesajı ne yapacağını yani worker’ı orada tanımlayacağız. Bu arada dosyaların adını değiştirebiliriz, consumer ya da publisher olmak zorunda değil. Ben worker.py diye kaydedeceğim, şimdi de onun kodlarını yazalım.

import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='messages')
print 'Cikmak icin CTRL-V kullanin'

def printMessage(ch, method, properties, body):
    print 'Gelen mesaj:%s' (body,)

channel.basic_consume(callback, queue='messages', no_ack=True)
channel.start_consuming()

İlk beş satırın ne yaptığını biliyoruz zaten. Hemen ardından ‘printMessage’ adında bir method oluşturdum ve bunun içine bazı parametreler verdim. Burada body parametresi bizim gönderdiğimiz mesaj. Diğerlerini ben de bilmiyorum önümüzdeki günlerde öğrenmeyi hedefliyorum ama genel kullanımı bu şekilde. Bu method içinde mesajı ekrana yazdırıyorum. ‘channel.basic_consume(printMessage, queue=’messages’, no_ack=True) diyerek worker’ın tüm özelliklerini giriyorum. ‘queue=messages’ diyerek hangi kuyruğu dinlemesini söyledim. ‘no_ack=True’ ise işinin bittikten sonra haber vermesini istedim. Böylece ekrana mesajı bastım, benden sonraki worker devam edebilirim demesini sağlıyorum. ‘channel.start_consuming()’ ile de çalışmasını başlatıyorum.

Kodlarımı bu kadar. Şimdi 3 terminal ekranı açalım ve ilk 2 terminalden ‘worker.py’ dosyamızı çalıştıralım böylece kuyruğu dinlemeye başlasın.

Screen Shot 2015-10-10 at 9.01.08 PM

Workerlar şu an kuyruğu dinliyor ve mesaj gelirse ekrana basacaklar. Şimdi 3.terminalden ‘publisher.py’ dosyamızı çalıştıralım ve sonucu gözlemleyelim.

Screen Shot 2015-10-10 at 9.02.56 PM

Ve işte istediğim oldu 🙂 ilk worker 1.mesajı yazarken, ikinci worker 2.mesajı yazdı. Dikkat etmemiz gereken konu bunu aynı anda yapmaları. Normalde daha uzun sürecek bir işi yarı yarıya indirdim. Bu örnek ile RabbitMQ ve asenkron yapısını en basit haliyle ve en anlaşılır şekilde kullandım. Ben de yeni yeni kullanmaya ve öğrenmeye başladım. Umarım faydalı olur, bir sonraki yazımda görüşmez üzere… 🙂

Leave a Reply

Your email address will not be published. Required fields are marked *

*