RabbitMQ
Sadržaj
RabbitMQ – broker poruka
RabbitMQ je softver koji predstavlja posrednika u razmeni poruka (eng. message broker) koji u originalu implemntira AMQP protokol (Advanced Message Queuing Protocol) . RabbitMQ server program je pisan u Erlang programskom jeziku i izgradjen je na Open Telecom Platform frejmorku za klasterovanje i brz oporavak od pada sistema. Najprostije rečeno, RabbitMQ prestavlja softver gde možemo definisati redove, aplikacije se mogu povezati na te redove i slati ili primati poruke preko njih.
Poruka može sadržati bilo kakvu vrstu informacija. Na primer, može sadržati informaciju o procesu ili zadatku koji bi trebalo da bude pokrenut na nekoj drugoj aplikaciji (koja može biti i na drugom serveru), ili može biti obična tekstualna poruka. Menadžer reda za poruke skladišti poruke sve dok se ne pojavi aplikacija koja preuzima poruku sa reda. Nakon toga, aplikacija koja je preuzela poruku, obrađuje je i koristi u skladu sa svojim potrebama.
Šta nam RabbitMQ omogućava?
Razmena poruka nam omogućava lakše povezivanje i skaliranje aplikacija. Aplikacije se mogu međusobno povezivati kao komponente neke veće aplikacije ili se mogu povezivati sa uređajima korisnika i primati podatke. O razmeni poruka možemo misliti kao o isporuci nekih podataka, push notifikacijama, upotrebi publish/subscribe sistema ili pak redova za poruke. RabbitMQ nudi mnoštvo funkcija koje vam omogućavaju da povećate pouzdanost na štetu preformansi, što podrazumeva trajnost, potvrdu isporuke, potvrdu pošiljaoca i visoku dostupnost. RabbitMQ klijenti postoje za gotovo sve programske jezike, a isto tako su podržani različiti protokoli za razmenu poruka.
Instalacija
Da bismo na primerima pokazali kako funkcioniše razmena poruka i koje sve mogućnosti RabbitMQ poseduje, neophodno je da na svom računaru imam instalirane odgovarajuće pakete. Ukoliko radite sa Debian ili Ubuntu (14.04-18.04) distribucijama dovoljno je da preuzmete i instalirate paket sa navedene adrese:
https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server_3.7.7-1_all.deb
Ukoliko želite da instalirate sve neophodne pakete ručno ili koristite Windows operativni sistem detljnija uputstva možete pronaći na sledećoj adresi:
https://www.rabbitmq.com/download.html
RabbitMQ server će biti automatski pokrenut nakon instalacije paketa, ali i sa svakim ponovnim pokretanjem računara. Status servera možete proveriti komandom:
sudo rabbitmqctl status
Napomena: Za instalaciju i korišćenje RabbitMQ servera neophodne su vam sudo privilegije.
Kako zapravo radi?
RabbitMQ server možemo zamisliti kao poslovnicu Pošte Srbija. Kada želimo da pošaljemo poruku odlazimo u najbližu poslovnicu i u poštansko sanduče ubacujemo pismo. Nakon toga, pismo će, gotovo sigurno, biti isporučeno onome kome je namenjeno. U ovoj analogiji, RabbitMQ je poštansko sanduče, poslovnica Pošte i poštar.
Glavna razlika između Pošte i RabbitMQ-a je u tome što RabbitMQ radi tako što prihvata, skladišti i prosleđuje podatke u binarnom formatu – poruke, dok Pošta radi sa kovertama i papirima. RabbitMQ, kao i svi programi koji omogućavaju razmenu poruka, koriste određeni žargon. Neki od najbitnijih pojmova su:
- Proizvodnja (Producing): znači slanje poruke (sending). Program koji šalje poruku je proizvođač, tj. producer.
- Red (Queue): je naziv za poštansko sanduče koje se nalazi unutar RabbitMQ-a, a u suštini predstavlja veliki bafer za poruke. Sve poruke se skladište u odgovarajući red. Red je ograničen memorijom i veličinom diska host računara.
- Potrošnja (Consuming): znači primanje poruke (receiving). Program koji čeka da primi poruke je potrošač, tj. consumer.
Napomena: Proizvođač, potrošač i broker ne moraju obitavati na istom hostu; zapravo, u većini situacija to nije slučaj.
Razmena poruka
Poruke se ne prosleđuju direktno u red, umesto toga, proizvođač šalje poruke na razmenu. Proces razmene je odgovoran za rutiranje poruka u različite redove. Razmena prhvata poruku od proizvođača i određuje njenu rutu na osnovu ključeva za rutiranje i vezivanje (bindings and routing keys). Veza (binding) povezuje proces razmene sa redovima.
Tok poruka
- Proizvođač šalje poruku na razmenu. Kada kreiramo razmenu definišemo i njen tip.
- Razmena prhvata poruku i postaje odgovorna za njeno rutiranje, koje vrši na osnovu atributa poruke.
- Kreirane su veze od razmene ka redovima. U prikazanom primeru imamo dva različita reda i po jednu vezu ka svakom od njih.
- Poruka stoji u redu dok ne bude preuzeta od strane potrošača.
- Potrošač obrađuje poruke.
Tipovi razmene
Postoje četiri vrste razmene:
- Direktna razmena (Direct) Poruka će biti isporučena u redove čiji se ključevi za vezivanje potpuno slažu sa ključevima za rutiranje navedinim u poruci
- Rasuta razemena (Fanout) Poruka će biti prosleđena svim redovima koji su vezani za razmenu
- Tematska razmena (Topic) Poruka će biti isporučena u redove ukoliko ključ za rutiranje zadovoljava šemu definisanu vezom reda i razmene
- Razmena zaglavlja (Headers) Za rutiranje koristi atribute koji se nalaze u zaglavlju poruke, a ne ključeve za rutiranje.
Primer Hello World
Napisaćemo dva programa u Java programskom jeziku; proizvođač koji će izvršavati slanje jedne poruke, i potrošač koji će prihvatati poruke i ispisivati ih u terminalu. Na dijagramu iznad prikazani su Proizvođač
, označen sa “P”, i Potrošač
, označen sa “C”. Crveni pravougaonik u sredini je red – bufer za poreke. Da bismo napisali ove programe neophodan nam je Java klijent za RabbitMQ server. Neophodne biblioteke možete pronaći na sledećim vezama:
klijentska biblioteka
SLF4J API
SLF4J Simple
Klase koje su nam potrebne importujemo na uobicajen način:
import com.rabbitmq.client.*;
Slanje poruke
Klasa Send
će predstavljati našeg proizvođača. Proizvođač
se konektuje na RabbitMQ server, šalje jednu poruku, zatvara konekciju ka serveru i završava sa radom. Kostur klase:
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } }
Konekciju ka serveru kreiramo na sledeći način:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
Klasa Connection
apstrahuje soket konekciju na server i vodi računa o pregovorima o verziji protokola, autetnifikaciji i ostalim bitnim stvarima koje se dešavaju u pozadini. Konekcija predstvlja TCP konkciju između aplikacije i RabbitMQ brokera. Na ovaj način je ostvarena konekcija prema brokeru na lokalnoj mašini (localhost). Ukoliko se broker nalazi na nekoj drugoj mašini jednostavno bismo odgovarajućoj funkciji prosledili naziv ili IP adresu te mašine. Nakon toga, kreiramo kanal (channel
) koji nam omogućava korišćenje većine funkcija za komunikaciju sa brokerom. Kanal je virtualna konekcija unutar konekcije. Da bismo vršili slanje poruke, moramo da deklarišemo red u koji želimo da pošeljemo poruku.
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
Ako deklarisani red ne postoji, biće kreiran, u suprotnom, nema nikakvih promena. Kada obavimo slanje poruke zatvaramo kanal, a zatim i konekciju ka brokeru.
channel.close(); connection.close();
Prijem Poruke
Klasa Receive
će predstavljati potrošača. Ona će čitati poruke koje pristižu na red koji će konstantno osluškivati. Kao i kod klase Send
i ovde na sličan način otvaramo konekciju i kanal, i deklarišemo red sa koga želimo da čitamo poruke.
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); } }
Ovde ćemo, takođe, deklarisati red za slučaj da program Potrošač
prvi počne sa radom, a deklarisanjem obezbeđujemo da red sigurno postoji. Sada moramo obavestiti server da želimo da nam dostavlja poruke koje stižu u red. S obzirom da server poruke šalje asinhrono moramo obezbediti callback
funkciju (u formi objekta klase DefaultConsumer
) koja će baferovati poruke sve dok proces Potrošač
ne bude bio spreman da ih iskoristi.
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
Pokretanje
Sada su obe klase kompletirane. Da bismo ih kompajlirali neophodno je da RabbitMQ Java klijent dodamo u putanju klase (classpath). Ukoliko koristite Eclipse radno okruženje putanja je sledeća: desni klik na projekat → Build Path → Configure Build Path, zatim izaberete karticu Libraries i dodate vec pomenute .jar fajlove koji su deo* RabbitMQ Java* klijenta. Nakon toga oba programa pokrecete opcijom Run as Application. Dodavanje .jar fajlova, kompajliranje i pokretanje možete izvršiti i iz terminala pomoću sledećih komandi:
- kompajliranje klasa
javac -cp amqp-client-4.0.2.jar Send.java Recv.java
- pokretanje programa
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send
Prikaz redova
Korićenjem rabbitmqctl
alatke možemo da vidimo koliko je redova kreirao RabbitMQ i koliko se poruka u njima nalazi: Ubuntu:
sudo rabbitmqctl list_queues
Windows:
rabbitmqctl.bat list_queues
Rezultat komande nakon pokretanja programa Proizvođač
:
Timeout: 60.0 seconds ... Listing queues for vhost / ... hello 1
Rezultat komande nakon što RabbitMQ broker prosledi poruku Potrošaču
:
Timeout: 60.0 seconds ... Listing queues for vhost / ... hello 0
Round-robin dispatching
Ako postoji samo jedan potrošač, a više poruka u redu, sve poruke će biti prosleđene tom potrošaču. Međutim, ako bismo imali više poruka u redu i više potrošača RabbitMQ broker bi primenio round-robin sistem gde će svaki n-ti potrošač dobiti n-tu poruku iz reda. Na taj način se obezbeđuje ravnomerno prosleđivanje poruka, tačnije, u proseku će svaki potrošač dobiti isti broj poruka.
Message acknowledgment
Kao što možemo primetiti, nakon što isporuči poruku onom klijentu koji osluškuje odgovarajući red, RabbitMQ briše poruku iz tog reda. Ukoliko se dogodi da jedan od potrošača u toku obrade poruke prestane sa radom poruka će biti izgubljena. To, naravno, želimo da izbegnemo i da u tom slučaju poruku prosledimo nekom drugom slobodnom potrošaču. Upravo zbog toga potrošač ima mogućnost da pošalje poruku kojom obaveštava RabbitMQ brokera da je poruka primljena, obrađena i da može biti izbrisana iz reda. Ukoliko broker ne primi tu poruku znači da je potrošač prestao sa radom i da poruka treba biti prosleđena nekom drugom potrošaču.
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
Kod iznad je sličan kao u klasi Recv
u primeru HelloWorld
, samo što smo dodali deo gde nakon završene obrade poruke funkcijom channel.basicAck
šaljemo potvrdu serveru. Takođe je potrebno da isključimo automatsko slanje potvrde postavljanjem flag
-a autoAck
na false
. Moramo napomenuti da potvrda mora biti poslata preko istog kanala preko koga je primljena poruka, u suprotnom, rezultat će biti izuzetak vezan za protokol nivoa kanala. Praćenje potvrda možemo vršiti pomoću komande:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Prvi broj u rezultatu označava broj preostalih poruka za slanje, dok drugi broj označava broj potvrda koje čekamo.
Timeout: 60.0 seconds ... Listing queues for vhost / ... hello 0 1
Ovaj izlaz nam govori da je potrošač primio poruku, ali još nije potvrdio da je obrada završena. U narednom izlazu vidimo da se poruka i dalje nalazi u redu i da čeka da bude prosleđena narednom potrošaču, što znači da je prethodni potrošač prestao sa radom i nije obradio poruku do kraja.
Timeout: 60.0 seconds ... Listing queues for vhost / ... hello 1 0
Message durability
Slanje potvrda brokeru o prijemu poruke sprečava gubitak poruke iz reda ukoliko neki potrošač prestane sa radom i ne izvrši obradu poruke do kraja. Međutim, i dalje postoji mogućnost gubitka poruka ako RabbitMQ server prestane sa radom. Kada RabbitMQ prestane sa radom, bilo zbog neke greške, nestanka struje ili namernog isključenja, zaboraviće sve redove i poruke u njima, osim ako mu ne kažemo drugačije. Prvo, moramo da obezbedimo da RabbitMQ nikada ne izgubi naš red. Da bismo to uradili moramo proglasiti da je naš red trajan.
channel.queue_declare(queue='hello', durable=True)
U primeru HelloWorld
već smo deklarisali red “hello”
i on postoji na brokeru od trenutka pokretanja naših primera tako da bi ova komanda u našem slučaju rezultovala greškom. To je zato što RabbitMQ ne dozvoljava redefinisanje redova sa drugačijim parametrima. Zato ćemo deklarisati novi red (oba programa iz našeg primera moraju deklarisati isti red).
channel.queue_declare(queue='task_queue', durable=True)
Sada smo sigurni da će naši redovi postojati i nakon ponovnog pokretanja RabbitMQ-a. Drugi korak podrazumeva da obezbedimo trajnost poruka. Za svaku poruku poslatu brokeru definišemo da želimo da bude trajna tako što vrednost atributa delivery_mode
postavljamo na 2. Kada je u pitanju trajnost poruka, ona ne može u potpunosti biti zagarantovana na ovaj način. Naime, uvek će postojati taj kratak vremenski period koji protekne od trenutka prihvatanja poruke do snimanja poruke na disk. Osim toga, RabbitMQ neće izvršavati upisivanje na disk za svaku poruku. Poruke će u većini slučajeva biti snimane u keš, a onda zajedno upisivane na disk.
Ravnopravno prosleđivanje poruka
Već smo pomenuli da prosleđivanje poruka funkcioniše tako što se svaka n-ta poruka prosleđuje n-tom potrošaču. Uzmimo sada primer u kome imamo dva potrošača, svaka neparna poruka je velika i prvi potrošač će biti konstatno zauzet, a svaka parna poruka je mala i drugi potrošač gotovo da neće imati posla. Ovo se dešava zbog toga što RabbitMQ samo prosleđuje poruke, ne gledajući broj nepotvrđenih poruka za potrošača. On se “na slepo” pridržava svog standardnog načina prosleđivanja.
Ovde možemo iskoristiti funkciju channel.basic_qos
kojoj ćemo proslediti parametar prefetch_count=1
. Na taj način govorimo RabbitMQ serveru da ne šalje poruku potrošaču ukoliko on još uvek obrađuje prethodnu, bez obzira na to da li je on na redu, već da prosledi poruku prvom slobodnom potrošaču.
Publish/Subscribe
Već smo opisali potpuni model razmene poruka na RabbitMQ brokeru, pri čemu smo rekli da proizvođač nikada ne šalje poruku direktno u red, već je prosleđuje na razmenu, gde se poruka usmerava na redove u zavisnosti od vrednosti ključa za rutiranje (odnosno, atributa zaglavlja ili šema za rutiranje, zavisno od tipa razmene). Sada ćemo videti kako se kreiraju razmene, veze i kako se vrši rutiranje poruke na sve prijavljene redove. Za početak kreiraćemo razmenu tipa fanout
.
channel.exchangeDeclare("logs", "fanout");
Prilikom slanja poruke navodimo samo naziv razmene:
channel.basicPublish( "logs", "", null, message.getBytes());
Neophodo je i da kreiramo vezu između razmene i reda:
channel.queueBind(queueName, "logs", "");
Treći parametar je ključ za rutiranje, međutim, njegova vrednost će svakako biti ignorisana zbog tipa razmene. Komande za listanje razmena i veza na serveru su redom:
sudo rabbitmqctl list_exchanges rabbitmqctl list_bindings
U ovom slučaju potrošači kreiraju redove i povezuju ih sa razmenom.
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");
Svaki potrošač kreira zaseban red, a svakom redu biva prosleđena poruka koja stigne na razmenu. Na taj način, svaki potrošać će primeti po primerak jedne iste poruke, što u suštini predstavlja publish/subscribe šablon.
Pozivanje udaljene procedure
Takozani,* RPC* (Remote procedure call) šablon je veoma korišćen u programiranju i zasniva se na pozivanju metode koja se nalazi na udaljenom kompjuteru i čekanju rezultata. U narednom primeru, ilustrovaćemo rad RPC servisa preko RabbitMQ servera. Klijent će imati metodu call
koje će slati RPC zahtev serveru i čekati na odgovor.
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
Klijent šalje serveru poruku sa zahtevom, a server odgovara tako što RabbitMQ serveru prosleđuje poruku sa odgovarajaćim odgovorom.
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
Na ovaj način se za svaki RPC zahtev kreira novi red, što nije dovoljno efikasno. Bolji i efikasniji način je da svaki klijent kreira svoj red. To sa sobom povlači nove probleme. Kako ćemo znati na koji se zahtev ondosi na odgovor koji je pristigao na red? Srećom, AMQP protokol nam omogućava da uključimo atribute u poruku, a u ovoj situaciji ćemo iskoristiti correlationId
, kome ćemo dodeliti novu jedinstvenu vrednost za svaki zahtev. Kasnije, kada poruka pristigne, na osnovu tog id-a vrlo lako možemo da uparimo zahtev i odgovor. Na ovaj način sprečavamo i prihvatanje dupliranih odgovora.
Klijent šalje RPC zahtev, tačnije šalje poruku serveru sa dva atributa: replayTo
, koji je setovan na ekskluzivni red kreiran samo za prihvatanja zahteva, i correlationId
, koji je setovan na jedinstvenu vrednost za svaki zahtev. Zahtev se šalje na rpc_queue
red. Server osluškuje taj red čekajući novi zahtev. Kada se zahtev pojavi, server ga obrađuje i klijentu šalje poruku sa rezultatom na red replyTo
. Klijent oslučkuje red repalyTo
. Kada poruka stigne proverava correlationId
atribut. Ako se slaže sa vrednošću koja je generisana pri slanju zahteva nastavlja sa obrađivanjem poruke. Ovaj primer ćemo prikazati pomoću dve klase. U klasi RPCServer
uspostavicemo konekciju, kanal i deklarisati red. Ukoliko želimo da pokrenemo više od jednog server procesa, onda moramo setovati promenljivu prefetchCount
u metodi channel.basicQos()
kako bi savki server dobijao isti broj zahteva. Koristimo basicConsume
da pristupimo redu, gde kreiramo objekat DefaultConsumer
koji u pozadini vrši proračune i šalje odgovor nazad. Klijentski kod je nešto zahtevniji (klasa RPSClient
). Prvo, uspostavljamo konekciju i kanal. Metoda call()
je zadužena za slanje RPC zahteva. Tu se kreira correlationId
i pamti u memoriji kako bi kasnije bio iskorišćen za prihvatanje odgovarajućeg odgovora. Zatim kreiramo eksluzivni red za odgovore i prijavimo se na njega. Na kraju šaljemo poruku sa setovanim atributima replyTo
i correlationId
. S obzirom na to da se isporuka desava u zasebnoj niti moramo obezbediti da i glavna nit bude suspendovana dok odgovor ne stigne. Jedno od rešenja jeste upotreba blokirajućih redova. U ovom slučaju kreiramo ArrayBlockingQueue
sa kapicitetom postavljenim na 1 zato što ćemo čekati uvek na jedan odgovor. Metod handleDelivery
je zadužen za proveru odgovora, tj. da li se correlationId
primljenog odgovora poklapa sa zahtevom. Ako se poklapa, smeštamo ga u blokirajući red. U isto vreme glavna nit čeka na odgovor kako bi ga preuzela sa reda. Glavna nit preuzima poruku i prikazuje odgovor korisniku. Dizajn prikazan u prethodnom primeru nije jedina moguća implementacija RPC servisa, ali ima mnoge prednosti:
- ako RPC server previše spor možemo izvršiti skaliranje tako što ćemo dodati još jedan server;
- na klijentskoj strani, RCP zahteva slanje i primanje samo jedne poruke. Rezultat toga je da je RPC klijentu dovoljan samo jedan povratan put kroz mrežu za jedan RPC zahtev.
Paralelno programiranje upotrebom RabbitMQ redova za poruke
Prethodni primer sa pozivanjem udaljene procedure možemo proširiti tako da se izračunavanje koje generiše rezultat za instace RPCClient
klase raspodeli na sve pokrenute instance RPCServer
klase. U ovom primeru koristićemo upravo tu Monte Karlo metodu za određivanje broja ????. Generisaćemo n tačaka (x, y) gde ćemo vrednosti x i y uzimati metodom slučajnog uzorka. U RPCServer
klasi ćemo definisati novu metodu monteCarlo, gde će promenljiva in predstavljati broj tačaka koje zadovoljavaju uslov x2 + y2≤ 1.
private static int monteCarlo() { int in = 0; double x, y; Random random = new Random(); for(int i = 0; i < n_samples; i++) { x = random.nextDouble(); y = random.nextDouble(); if (x*x + y*y <= 1) in++; } return in; }
U RPCClient
takođe pravimo izmene. Umesto jedne poruke u metodi call
poslaćemo n poruka i čekati dok sve ne budu primljene, a onda cemo u glavnoj niti ispisati rezultat. Metoda call
sada izgleda ovako:
public double call() throws IOException, InterruptedException { Channel channel = connection.createChannel(); List corrIds = new ArrayList(); String replyQueueName = channel.queueDeclare().getQueue(); String message; for(int i = 0; i < n_scale; i++) { corrIds.add(UUID.randomUUID().toString()); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrIds.get(i)) .replyTo(replyQueueName) .build(); message = Integer.toString(i); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); } final BlockingQueue response = new ArrayBlockingQueue(10); String ctag = channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { for (String corrId : corrIds) { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } } }); String in; int in_global = 0; for (int i = 0; i < n_scale; i++) { in = response.take(); in_global += Integer.parseInt(in); } Double result; result = 4.0*in_global/(10000.0*n_scale); channel.basicCancel(ctag); try { channel.close(); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); } return result; }
U RPCServer
klasi smo već setovali channel.basicQos(1)
pa znamo da će sve poruke biti ravnomerno raspoređivane na servere. S obzirom na to, genrisanje slučajnih tačaka ivzvršavaće se paralelno na onoliko servera koliko je dostupno u datom trenutku. Odgovor na svaku poruku je broj tačaka koje zadovolljavaju gore pomenuti uslov. Broj ???? se onda računa po formuli:
????≈ (4 * Ninner) / Ntotal
Gde je Ninner ukupan broj pogodaka, a Ntotal ukupan broj uzoraka. Primer sa upotrebom RabbitMQ redova za poruke u svrhu paralelnog izračunavanja Monte Carlo metode za određivanje broja ???? je jedan od jednostavnijih primera paralelnog programiranja uopšte pošto nema nikakve komunikacije između paralelnih procesa, u suprotnom kod RPCServer
klase bio bi mnogo zahtevniji i komplikovaniji.
Poređenje performansi
Postoji mnogo softverskih rešenja i bibloteka koje omogućavaju komunikaciju između različitih procesa, međutim, nisu sva podjednako efikasna u rešavanju raznovrsnih problema. Na izbor rešenja utiču mnogobrojni faktori, od toga koji tip komunikacije imamo, da li šaljemo manje ili veće pakete, do toga koliko nam je bitna garancija isporuke ili latencija. Uzmimo opet za primer izračunavanje Monte Carlo metode za određivanje broja ????. Korišćenjem RabbitMQ klijenta bez većih problema smo implementirali podelu izračunavanja na više servera i time znatno smanjili vreme čekanja na rezultate. Kada imamo jednog klijenta i samo jedan aktivni server koji izvršava slučajno uzorkovanje 1000000000 puta rezultat možemo očekivati za nekih 77.0 sekundi.
PI = 3.141596724 Elapsed time is 77.0 PI25 - PIour = -4.070410206669095E-6
Ako dodamo još jedan server vreme izvršavanje će biti skoro dva puta kraće zato što sada imamo dva servera koja paralelno obrađuju po 500000000 uzorkovanja.
PI = 3.141580376 Elapsed time is 38.0 PI25 - PIour = 1.2277589793274757E-5
Međutim, sa četiri servera nećemo dobiti četiri puta kraće vreme izvršavanja pošto se određeno vreme gubi u slanju, primanju i raspoređivanju poruka.
PI = 3.141490404 Elapsed time is 26.0 PI25 - PIour = 1.0224958979332399E-4
Pogledajmo sad performanse programa pisanog korišćenjem MPI (Message Passing Interface) biblioteke, gde MPI predstavlja prenosivi standard razmene poruka namenjen upotrebi na paralelnim računarskim arhitekturama. Program pisan po MPI standardu u našem slučaju podrazumeva pokretanje n procesa koji će obavljati isti posao kao i RPCServer
, s tim da ne postoji poseban program koji će predstavljati klijenta već je nulti proces taj koji nam vraća vrednost broja ????. Kada pokrenomo samo jedan MPI proces koji izvršava 100000000 uzorkovanja dobijamo vreme izvršavanja od 69 sekundi, što je za skoro 10 sekundi kraće vreme izvršavanja u odnosu na RabbitMQ rešenja sa jednim aktivnim serverom.
PI = 3.1416036119999999343121999 Elapsed time is 69.307003 PI25 - PIour = -0.0000109584102068183142364
Kada pokrenemo dva procesa opet dobijamo nešto kraće vreme izračunavanja u odnosu na RabbitMQ implentaciju.
PI = 3.1415385960000001830394467 Elapsed time is 35.220628 PI25 - PIour = 0.0000540575897929329585168
Sa četiri aktivna MPI procesa dobijamo nešto duže vreme izračunavanja u odnosu na RabbitMQ rešenje što može biti uzrokovano sinhronizacijom MPI procesa i troškovima razmene poruka između većeg broja procesa.
PI = 3.1415493319999998611535830 Elapsed time is 27.595450 PI25 - PIour = 0.0000433215897932548443805
Ono što je bitno napomenuti je to da je MPI pre svega namenjen radu na klasterima sa brzom i pouzdanom mrežom, i u tom slučaju je mnogo bolji izbor izbor ako se razvija aplikacija koja treba paralelno da izvršava kompleksne naučne proračune. RabbitMQ ćemo pre koristiti kad nam je potrebna komunikacija između različitih udaljenih aplikacija ili aplikacija koje funkcionišu po principu proizvođač/potrošač, ali i kada nam je potrebna veća otprnost na greške i mrežnu nestabilnost kao kada su u pitanju distribuirani sistemi.
Zaključak
RabbitMQ rešenje za razmenu poruka je primenljivo u mnogim situacijama. Najčešće se koristi kako bi omogućio veb servisima da na zahtev odgovore brzo, umesto da budu prinuđeni da izvršavaju zahtevne operacije dok korisnici čekaju na rezultat. Koristi se i za distribuiranje poruke ka više klijenata, ali i za balansiranu raspodelu poruka među potrošačima. Konkretni slučajevi korišćenja mogu biti:
- Aplikacija mora da radi sa bilo kojom kombinacijom postojećih protokola kao što su AMQP0-9-1, STOMP, MQTT, AMQP 1.0.
- Potrebna je bolja kontrola konzistentnosti na osnovu poruka.
- Aplikaciji su potrebne različite vrste razmene poruka (point to point, request / reply, publish/subscribe)
- Kompleksno rutiranje ka potrošačima, integrisanje više servisa/aplikacija sa netrivijalnom logikom rutiranja.
Dodatak
Kodovi klasa korišćenih u primerima:
- HelloWorld
Send.java
Recv.java
- Work Queues
NewTask.java
Worker.java
- Publish/Subscribe
EmitLog.java
ReceiveLog.java
- Routing
EmitLogDirect.java
ReceiveLogsDirect.java
- Topics
EmitLogTopic.java
ReceiveLogsTopic.java
- Remote procedure call
RPCServer.java
RPCClient.java
- Monte Carlo (RPC)
RPCClient.java
RPCServer.java
- Monte Carlo (MPI)
monteCarlo.c