Make love, not var_dump())

Kaptanın seyir defteri

3 sene önce yazdığım, Kodaman.org'un kapanmasıyla tarihin sayfalarına karışmasına gönlümün el vermediği bir belge ile daha karşınızdayım :)

Neden gerekli?

RabbitMQ Logo

Ölçeklenebilir bir web ortamı oluşturmak için anlık yapılmayan (asenkron) işlemler çok önemlidir. Bir mesaj kuyruğu imlementasyonu ile, işlemleri sıraya sokup, sırası geldiği zaman işlenmesini sağlayabilirsiniz.



Mesajları bir sıraya sokup işlem yaptırmaya aslında yabancı değiliz; bir çoğumuz kendi "mesaj kuyruğu" implementasyonunu biraz kötü bir şekilde yapmıştır. Örneğin, kullanıcılara gönderilecek e-postaları bir veritabanına(MySQL?) yazıp, cron job'lar ile belli zaman aralıklarında bu veritabanından bir veriyi çekip, e-postayı gönderdikten sonra bu ilgili kaydı veritabanından silme işlemini yapmışızdır.


Ancak burada bazı sorunlarımız var:
  1. Veritabanları bu iş için değil (yaz, belli aralıklarla oku 'select * from queue where completed=0 limit 1', sil)
  2. Anlık istek sayınız (concurrency) arttıkça, "işin mesajını"i veritabanına "yazmak", işi tamamlamaktan daha uzun zaman alabiliyor
  3. "Ölçekleme"den bashedince, veritabanına "yazma" işlemini ölçeklemeniz, gerçekten çok zor.
  4. Üstelik, bu mesaj kuyruğunu implemente etmek, test etmek ve performansını düzeltmek için uzunca bir zaman harcamanız gerekebiliyor.

RabbitMQ bunların hepsini sizin için yapıyor.

Özgür mesaj kuyruğu

Mesaj kuyruğu (veya "enterprise service bus" ) çözümleri, kurumsal pazarda sıkça kullanılan ve pastanın büyük bir payının IBM,Tibco gibi firmaların elinde bulunduğu bir pazardı. 2004 yılında başlayan ve 2006'da olgunlaşan açık standartlı AMQP çözümü ile "mesaj kuyruğu" çözümleri bir standart ile açık hale geldi.


OpenAMQ, Apache QPid ve Red Hat Enterprise MRG gibi kimi kurumsal kimi ise açık çözümler piyasaya çıktı. Bunların arasından RabbitMQ ise, ölçeklenebilirliği, kararlılığı ve basitliği ile ön plana çıktı.


Erlang ile geliştirilen bu uygulama, çoğu Erlang uygılamasında olduğu gibi thread-safe bir yapıya sahip ve kolayca ölçeklenebiliyor.

RabbitMQ Şema

Bilmeniz gereken bir kaç terim:
consumer : Kuyruğu dinleyen uygulama
publisher : Kuyruğa mesaj gönderen uygulama
VirtualHost : VirtualHost'lar, genelde yetki yönetimi için kullanılır, Exchange ve Queue'lar virtualhost'lar içinde tanımlanır
Exchange : Mesajı ilgili "routing key"e göre ilgili queue'ya yönlendiren bölüm
Queue : Mesajların son olarak düştüğü kuyruk
Exchange type : Gelen mesajın, "routing key"e göre hangi queue'ya "nasıl" gönderileceğini belirtir

Kurulum

RabbitMQ kurulumu

RabbitMQ'nun sitesinde Linux ve Windows için hazırlanmış paketler bulunmakta. Debian/Ubuntu kullanıcıları
apt-get install rabbitmq-server
komutu ile uygulamayı kurabilirler. Sunucunun 5672 portundan başladığından emin olun :)

İstemci kütüphaneleri kurulumu

- Python kütüphanesi
Linux kullanıcıları aşağıdaki komutlarla py-amqplib kütüphanesini kurabilir
  wget -c http://barryp.org/static/software/download/py-amqplib/0.6/amqplib-0.6.tgz
  tar -zxvf amqplib-0.6.tgz
  cd amqplib-0.6
  python setup.py install


- PHP Kütüphanesi
SVN deposundaki kodları çekerek, oluşturacağınız PHP dosyanızı koyacağınız dizinden erişilebilecek bir dizine kopyalayın
svn checkout http://php-amqplib.googlecode.com/svn/trunk/ php-amqplib-read-only

Örnek işlem

Sisteme bir kullanıcı kaydolduğu zaman, onun bulunduğu yere yakın kullanıcılara e-posta gönderen bir uygulama yazacak olalım. Yapmamız gereken işlemler şunlar:
  • Veritabanından kaydolan kullanıcının bulunduğu yeri al
  • Veritabanından, bu bölgeye yakın diğer kullanıcıları bul
  • Bulunan kullanıcılara e-posta gönder

Ve bu işlemin 10 saniye civarında sürdüğünü düşünün. Normalde bu işlemi "anlık" yapmaya kalkarsak, her kayıt olan kullanıcı kayıt sırasında, aslında onu çok da ilgilendirmeyen bir işlem için 10 saniye beklemek zorunda kalacak. Aslında bu tam "kuyruğa gönderilecek" bir iş.


Yazacağımız bir "publisher" ile, çalışmakta olan RabbitMQ sunucumuza "X kullanıcısı kayıt oldu, gerekli işlemler yapılmalı" tarzında bir mesaj göndermemiz ve işlemleri yapacak sorumlu uygulamanın (consumer), sırası gelince bu işi yapmasını sağlayabiliriz. Ve biz mesajı gönderdiğikten sonra kullanıcının bu işlemi beklemesine gerek kalmayacaktır.


Aşağıda PHP ile yazılmış bir "publisher" ve "Python" ile yazılmış bir consumer bu işlemleri yapıyor.


consumer.py
"""
  Kullanici kayit islemlerinden gelen mesajlari dinleyen,
  mesaj geldiginde ilgili kullanicinin bulundugu yeri veritabanindan cekme,
  kullanicinin bulundugu bu yere yakin diger kullanicilari cekme,
  ve bulunan diger kullanicilara e-posta gonderme islemlerini 
  simule eden kod 

  http://barryp.org/software/py-amqplib/ adresinden py-amqplib'i indirip kurmalisiniz
"""

from amqplib import client_0_8 as amqp
import time # islemlerin surdugu zamanlari simule etmek icin

class rabbitMQMailQueue:
  # yapilandirma degiskenleri
  def __init__(self):
    # rabbitMQ ayarlari
    self.rabbitMQConf = {"hostPort" : "localhost:5672",
                    "user" : "guest",
                    "pass" : "guest",
                    "virtualHost" : "/"
    }

    # kuyrugun ayarlari
    self.queueConf = {"name" : "eposta",
                      "exchange" : "kayit"
    }

  # kuyrugu dinle ve gelen her mesajda doJobs methodunu calistir
  def listen(self):
    try:
        # amqp'ye baglan
        conn = amqp.Connection(self.rabbitMQConf['hostPort'],
                               self.rabbitMQConf['user'],
                               self.rabbitMQConf['pass'],
                               virtualHost = self.rabbitMQConf['virtualHost'],
                               insist = False)
        chan = conn.channel()

        chan.queue_declare( self.queueConf['name'],
                            durable=True, # kuyruk yeniden basladiginda yeniden yaratilsin
                            exclusive=False, 
                            auto_delete=False) # son islemden sonra kuyrugu silme

        chan.exchange_declare(self.queueConf['exchange'],
                              type="direct", # direkt olarak ilgili kuyruga gonder
                              durable=True,
                              auto_delete=False,) 

        # queue ile exchange'i bagla
        chan.queue_bind(self.queueConf['name'], self.queueConf['exchange'])

        # islemden sonra cagirilacak methodu ayarla
        chan.basic_consume(self.queueConf['name'],
                           no_ack=True, 
                           callback=self.doJobs) # ilgili methodu calistir


        # kuyrugu dinlemeye basla
        while True:
          chan.wait()

        chan.close()
        conn.close()

    except Exception, error :
        print "Bir hata olustu : ",error

  # kuyruktan her mesaj geldiginde ilgili kullaniciya ait asagidaki islemleri yap
  def doJobs(self,msg):
    print "Kuyruktan bir mesaj alindi"
    userId = msg.body 
    print userId, " id'li kullanicinin bolgesi bulunuyor"
    time.sleep(1) # islemin 1 saniye surdugunu varsayalim
    print userId, " id'li kullanicinin bulundugu yere yakin kullanicilar bulunuyor"
    time.sleep(6) # islemin 6 saniye surdugunu varsayalim
    print "Bulunan kullanicilara e-posta gonderiliyor"
    time.sleep(3) # e-posta gonderme isleminin 3 saniye surdugunu varsayalim
    # toplamda islemler 10 saniye suruyor
    # bu islem bittikten sonra kuyruktan yeni mesaj aliniyor

# uygulamayi baslat
if __name__ == '__main__':
    mailQueue = rabbitMQMailQueue()
    mailQueue.listen()
publisher.php
/**
 * Kullanici kayit olduktan sonra kuyruga kullanici id'sini 
 * mesaj gonderen uygulama
 * 
 * http://code.google.com/p/php-amqplib/ adresinden PHP kutuphanesini indirmelisiniz
 */

//indirdiginiz kutuphane icindeki dosyanin yolunu gosterin
require_once('../amqp.inc');


class sendRegisterMessageToQueue {
  // yapilandirma
  function __construct() {
    // rabbitmq ayarlari
    $this->rabbitMQConf = array("host"=>"localhost",
                                "port"=>"5672",
                                "user"=>"guest",
                                "pass"=>"guest",
                                "virtualHost"=>"/");

    // kuyruk ayarlari
    $this->queueConf = array("exchange"=>"kayit");
  }

  // kullanici id'sini mesaj olarak gonder
  function send($userId) {
    try{
      // amqp'ye baglan
      $conn = new AMQPConnection($this->rabbitMQConf['host'], 
                                 $this->rabbitMQConf['port'],
                                 $this->rabbitMQConf['user'],
                                 $this->rabbitMQConf['pass']);

      $channel = $conn->channel();

      $channel->access_request($this->rabbitMQConf['virtualHost'], false, false, true, true);

 
      // mesajı gönder
      $msg = new AMQPMessage($userId, array('content_type' => 'text/plain'));
      $channel->basic_publish($msg, $this->queueConf['exchange']);

      $channel->close();
      $conn->close();
      
      return true;
    }
    catch(Exception $e) {
      echo "Bir hata oluştu ".$e->getMessage();
    }

  }
}


$messageSender = new sendRegisterMessageToQueue();

$userId = 777;

if ( $messageSender->send($userId ) ) {
   echo "Kullanicinin yakinindaki insanlara e-posta gonderme mesaji iletildi"; 
}

Kaynaklar

Kaynaklar:
http://www.rabbitmq.com/
http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/