.Net Core 3.1 ile MassTransit + RabbitMQ

Feyyaz Acet
9 min readMar 14, 2020

--

Masstransit

Merhaba arkadaşlar, Microservice’lerin trend topic olduğu bu günlerde asenkron mesajlaşmayı sağlayan message broker’ları kullanırken ihtiyacımız olan ve başımızı ağrıtan konuları çözüp paket olarak sunan MassTransit projesi ile ilgili tecrübelerimi paylaşmak istedim.

RabbitMq: asenkron mesajlaşmayı sağlayan mesaj broker dır. Başka bir deyişle ortak platformlar için mesajlaşmayı sağlayan bir mesaj-quedur.

Masstransit: .Net teknolojilerini kullanarak dağınık uygulamalar oluşturmak için kullanılan free, open soruce, lightweight bir message bus dır. Asenkron mesasjlamak için kullandığımız message broker’lar ile uygulamamız arasında bir abstraction yapıdır da diyebiliriz.
Bir çok yerde ESB(enterprise service bus) olarak geçmesine karşın(wikipedia da dahil), kendileri biraz mütevazi davaranak ESB olmadıklarını her fırsatta dile getirmektedir. MassTransit, Rabbitmq ile birlikte Azure Service Bus, ActiveMQ ve Amazon SQS/SNS message-queue’ları desteklemektedir.
Bunlar arasındaki geçişlerde herhangi bir ekstra kod yazmaya ihtiyaç duymamaktasınız.

Peki Masstransit nerede devreye girer? Niye ihtiyaç duyarız?
Microservice architecure oluşturup iş parcacıklarınızın bağımlıklarını kopardığınız veya bir şekilde uygulama akışınızda bazı işleri asenkron yapmaya karar verdiniz. İlk önce bir message broker seçmelisiniz.(Örneğimizde bu Rabbitmq olacak.)Bir message broker kullanmaya başladığınızda zamanla bir çok challange ile karşı karşıya kalacaksınız. Hata yönetimi, yeniden deneme, bekletme, transaction. İşte tamda bu noktada Chris Patterson ve Dru Sellers abimiz 2007 yılında alt.net te tanışmışlar ve aynı problemler ile karşı karşıya kaldıklarını görmüşler. Bu abilermiz hem kendileri için hemde başkaları bu kadar uğraşmasın diye sağolsunlar bu projeyi başlatmışlar:)

Masstransit Avantajları nelerdir.
Opensource ve free
Kullanımı kolay
İçerisinde bir çok microservice-message patterni(retry, circuit breaker, outbox gibi) barındırmaktadır.
Exception Handling built-in olarak gelmektedir.
Distributed transaction(Saga,event-driven state machines, routing-slip)
Test edilebilir
Monitoring

Bilmeniz Gerekenler

Messages

Masstransit içerisinde mesajlar .Net System Type’larını kullanır. Message Type class veya interface olabilir. Mastransit read-only properties ve içerisinde method olmayan tipleri tavsiye eder. Bu framework’ü geliştiren abilerimiz önceki tecrübelerine istinaden en uygun tip olarak interface’i önerir. Hatta biraz daha iler gidip, bir çok yazılımcının ortak hata olarak mesaj contractı base class olarak tanımlayıp consumer tarafında base class’ı dispatch ettiğini ve sonrasında bu sürecin sancılı olup acı çektiğini belirtiyorlar.

MassTransit içersinde mesajı iletmenin 2 yolu vardır. Publish ve Send. Eğer Event göndereceksiniz Publish, Command da ise Send kullanmanız gerekmektedir. Command gönderirken endpoint(hangi queue’ya gideceğini) belirtiyoruz. Ancak Event için bunu yapmıyoruz. Publish, Publish-Subscribe patterni’ni izler. Yani biz eventi publish ettiğimizde, event’imize abone olan tum quue’lara mesaj iletilmektedir.

Not: Publish-Subcribe pattern’i hakkında bilgi almak isteseniz buraya alalım.

Mesajları isimlendirme (Message Names)

2 çeşit message type bulunmaktadır. Event ve Command. Hangi işi yaptığına göre bunu belirten isimlendirmeler verilmesi gerekmetedir.(zorunlu değildir)

Events

Eventler direk enpoint’e iletmek yerine publish edilmelidir. Bir şeylerin olduğu anlamına gelir. İsimlendirme yapılırken geçmiş zaman dili kullanılması ile birlikte birşeylerin yapıldığının belirtmesi gerekmektedir.

Örnek Event İsimlendirmeleri

  • CustomerAddressUpdated
  • CustomerAccountUpgraded
  • OrderSubmitted,
  • OrderAccepted,
  • OrderRejected,
  • OrderShipped

Command

Bir şeylerin yapılması istendiğinde kullanılır. Emir kipi kullanılması tavsiye edilir.

Örnek Command İsimlendirmeleri

  • RegisterOrder,
  • UpdateCustomer
  • ChangeOrderState

Konunun dışına çıkmamak adına taraflar ile ilgili ek bir açıklama daha getirmeyeceğim. Rabbitmq ile ilgili tarafları veya uygulama akışını bilmek isteyen arkdaşları buradaki yazıma alabilirim.

Konuyu çok fazla uzatmadan uygulamamızdan bahsetmek istiyorum. Geliştirme esnasında kullanılan yapı hakkında ayrıca açıklamalar yapacağım. Şu aralar danışmanlığını yaptığım bir firmının sürecinden kısa bir örnek vereceğim. Öncelikle bir webapi projesi oluşturup buradan talepleri alacağım. Daha sonra bu talebi kaydetmek için queue’ya atacağım. Consumer tarafında ise talep kayıt işlemi yapıldıktan sonra oluşan bu talebin thirdparty bir service’e entegre olması ve Müşteri Temsilcisine mail atması için bir event oluşturcağım. Son olarak bir eventi işlemek adına 2 consumer (ThirPartyService ve NotificationService) oluşturacağım.

Evet hadi başlayalım…

Uygulamayı windows platformunda visual studio comminuty edition üzerinde .net core 3.1 ile geliştireceğim. Ek olarak rabbitmq kurulu olması gerekmektedir. Ben docker ile ayağa kaldıracağım. Siz kendiniz yükleyebilirsiniz ya da benimle birlikte adımaları takip edebilirsiniz.

Rabbitmq’yu docker ile ayağa kaldırarak başlayalım. Aşağıdaki komutu çalıştırdığınız rabbitmq image’i local bilgisayarınıza indirip container olarak ayağa kaldıracaktır.

docker run -d --hostname my-rabbit --name myrabbit -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=123456 -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Browserdan http://localhost:15672/ adresine girmeyi denediğinizde aşağıdaki ekran açılıyor ise problem yok demektir. Rabbitmq önetim paneli ile ilgili detaylı bilgiye girmeyeceğim. Merak ediyorsanız buraya alalım.

Projede ayrı bir virtualhost üzerinde çalışmak için aşağıdaki gibi demand adında bir virtualhost oluşturuyorum. Virtual host’u yapacağımız application için kullancağımız ayrı bir alan/url olarak düşünebilirsiniz.

Öncelikle DemandMenagement adında bir blank solution oluşturuyoruz. Akabinde talebi almak için DemandMenagement.Api (Web Api), Talebi database’e asenkron kaydetmek için DemandManagement.Registration(console), talebi thirdparty bir projeye entegre etmek için DemandManagement.Thirdparty.Service(console), aynı şekilde talebi müşteri hizmetlerine bildirmek için DemandManagement.Notification(console) ve son olarak ortak type(message contract) kullanmak için DemandManagement.MessageContracts(library) projesini oluşturuyorum. Solution’un son resmi aşağıdaki olmalıdır.

Yazmaya ortak olarak kullanacağımız interface’leri tanımlamak ile başlıyorum. DemandManagement.MessageContracts projeninin içine, talebi kaydetmek için IRegisterDemandCommand, kaydedilen talebi dinleyen consumer’ları bilgilendirmek için IRegisteredDemandEvent interface’ini oluşturdum. Bunlarla birlikte bus’ı yönetmek için static olarak BusConfigurator class’ını, rabbitmq erişim ve queue bilgilerini yönetmek içinde ise RabbitMqConsts class’ını tanımladım. Şimdi sıra geldi Nuget’ten Masstrainsit.RabbitMQ paketini yüklemeye.(Bu paketi tüm projelerimize yüklüyoruz.) Yine aynı şekilde tüm projelerimiz DemandManagement.MessageContracts projesini referans olarak ekleyecektir.

BusConfigurator Class

BusConfigurator class’ının son hali aşağıdaki gibidir.

RabbitMqConsts class’ının son hali aşağıdaki gibidir.

RabbitMqConsts.cs

Sıra geldi api tarafını yazmaya. Api projesinin ana dizinine Model adında folder create edip, talebi post edebilmek için RegisterDemandModel oluşturdum. Öncelikle DemandControler adında bir controller ekliyorum. Ve sadece talebi post edebileceğim şekilde ayarlıyorum.

DemandController’ın son hali aşağdaki gibidir.

RegisterDemandModel class’ının son hali aşağıdaki gibidir.

RegisterDemandModel.

Post action’ı içerisinde gelen requesti queue’ya iletmek için BusConfigurator class’ından yararlanıyorum. Bununla birlikte talebi queue’ya iletmek için bir endpoint oluşturup bu enpoint üzerinden mesajı queue’ya send ediyoruz. Post Action son hali aşağıdaki gibidir.

Daha sonra DemandManagement.Registration projesine geçiyoruz. Projemize DemandManagement.MessageContract projesini referans olarak ekleyip MassTransit.RabbitMQ paketinine Nuget’ten yüklüyoruz.

Program.cs main methodunda ServiceBus’ı ayağa kaldırıp registerdemand.service queue’ya abaone olup dinlemeye başlıyoruz. İlgili queue’ya kayıt geldiği anda buraya push edilecektir. Mesajı işlemek adına RegisterDemandCommandConsumer adında bir class oluşturuyoruz.

Program.cs main method içeriği
RegisterDemandCommandConsumer.cs içeriği

Şimdi postman ile ilk talebimizi queue’ya iletmekle birlikte DemandManagement.Registration tarafında da talebi consume edeceğiz.

Öncelikle solution üzerinde sağ click properties yapıp Startup Project sekmesinden multiple startup projects olarak işaretleyip api ve registiration projelerini start olarak seçiyoruz. Daha sonra F5 ile projelerimizi ayağa kaldırıyoruz.

Postman’i açıyoruz. Postman ile aşağıdaki şekilde talebimizi post ediyoruz.

Postman:Restful rest servislerimizi kolaylıkla call edebileceğimiz bir uygulama.

Post ile birlikte oluşan sonuç aşağıdaki gibidir.

Postman post sonucu
Registiration consumer console. İşlenen talep ve Id bilgileri.

Rabbitmq tarafında demand virtualhost altında registerdemand.service adında bir queue oluşturulmuş. Bunu MassTransit bizim yerimize otomatik olarak oluşturdu. Yine aynı şekilde mesajları yönetmek için otomatik olarak bir kaç exchange oluşturmuş. Masstransit default olarak Fanout exchange tipini kullanmaktadır. Exchange’ler hakkında detaylı bilgi almak isteyen var ise buraya alayım.

Projemizin ikinci aşamasında ise registerdemand.service tarafında işlenen talep için oluşan Id ile birlikte event fırlatacağız. Yine bu event’e abone olan thirdparty ve notification tarafında aynı eventi ayrı olarak işliyor olacağız.

Öncelikle RegisterDemandCommandConsumer.cs tarafında bir event oluşturup queue’lara iletiyor olacağız.

RegisterDemandCommandConsumer.cs son hali aşağıdaki gibidir.

RegisterDemandCommandConsumer.cs

Sıra geldi DemandManagement.Thirdparty.Service ve DemandManagement.Notification tarafında IRegisteredDemand eventine abone olup beklemeye. Aslında burada da olay çok basit. Registration projesinde olduğu gibi Service Bus’ı ayağa kaldırıp ilgili queue’yu dinlemek yeterli olacaktır. Thirdparty.Service için thirdparty.service, notification için notification.service queue’yu dinleyeceğiz.

Bunlar için her iki projeye de MessageContract projesini referans olarak ekleyip, MassTransit.RabbitMQ paketini de yüklüyor olacağız. Yine aynı şekilde event ile birlikte gelen mesajı kullanmak için her iki projede de consumer class’ımızı oluşturuyoruz. İki projenin pragram.cs ve consumer class’larının son hali aşağıdaki gibidir.

DemandManagement.Notification için

Program.cs main
DemandRegisteredEventConsumer

DemandManagement.ThirdParty için

Program.cs main
DemandRegisteredEventConsumer

Evet kodlamayı bitirdik. Şimdi teste geçebiliriz. MessageContract hariç tüm projleri ayağa kalkacak şekilde ayarlıyoruz.

Şimdi projemizi ayağa kaldırabiliriz. Aşağıdaki gibi bir ekran ile karşılaştıysanız gayet doğru gidiyorsunuz demektir :)

Bir bir önceki Postman ekranındaki gibi yine post işlemini yapıyoruz. Ve sonuç aşağıdaki gibidir.

Muhtemelen aklınıza takılmıştır. DemandRegisterCommand gönderirken direk queue belirttik ancak neden DemandRegisteredEvent gönderirken belirtmedik ve bu event’e subscribe olan queue’lara mesajı nasıl iletti?İşte cevap MassTransit’in neden default olarak Fanout exchange kulladığıdır. Bir önceki yazımda detayları bulabilirsiniz.

Şimdiye kadar herşey çok güzel. Happy path’i tamamladık. MassTransit bizim yerimize channel, exchange gibi iletişim süreçlerini yönetmesi yanında consumer tarafında da bir çok kolaylık sağlamaktadır.

Diyelim ki database bazen çok fazla yük altında kalabiliyor. Bu durumda bizde biraz daha yüklenip deadlock oluşturmayalım. MassTransit içerisinde CircutBreaker middleware’ini build olarak barındırmaktadır. Tek yapmamız aktif edip ihtiyacımız doğrultusunda implement etmek. DemandManagement.Registration içerisinde program.cs main methodu içeriğinin son hali aşağıdaki gibidir.

Not: Circut Breaker pattern’i hakkında detaylı bilgi almak için Martin Fowler’ın makalesini inceleyebilirsiniz.

DemandManagement.Registration program.cs main

TripThreshold ile alınan taleplerimizin %15'inde ve ActiveThreshold ile üst üste 10 hata alındğında 5 dk beklemesi gerektiğini ResetInterval ile belirtiyoruz. TrackingPeriod ise ResetInterval süresinden sonra 1 dk daha tetikte beklemesi söylüyor. Bu sürede yine hata alınır ise ActiveThreshold ve TripThreshold limitlerini beklemeden yine 5 dk süre ile beklemeye geçecektir.

Olmaz(tabi ki oluyor :)) ama diyelim ki bazen DemandManagement.Notification servisimizde bağlantı kopuyor vs geçiçi hatalar ile karşıyoruz. Normalde MassTransit mesajlarda kayba uğramamak ile birlikte queue akışını da bozmamak adına hata aldığı mesajları, [hata aldığı queue ismi].error adında bir queue oluşturup buraya atmaktadır. Biz hatanın geçici olduğunu biliyoruz ve belirtiğimiz miktar ve aralıkta denemesini istiyor isek bu durumda UseMessageRetry extension methodunu kullanbiliyoruz.

Not: retry pattern olarak aratıp detaylı bilgiye ulaşabilirsiniz.

DemandManagement.Notification içerisinde program.cs main methodu içeriğinin son hali aşağıdaki gibidir.

DemandManagement.Notification program.cs main methodu

Bu şekilde geliştirmeyi tamamladığımızda ilgili işlemi yapmam için 5 defa deneyecektir. Hata devam ederise mesahı error queue’a atıp bir sonrakine devam edecektir. Tabi ki MassTransit nimetleri burada bitmiyor.

DemandManagement.Thirdpary.Service projesinde bir entegreden bahsetmiştik. Bir çok entegrasyonda belirli süre içinde belirli request limitleri olabiliyor. Son olarak RateLimit middleware’inden bahsetmek istiyorum. RateLimit middlewere ile belirli süre içersinde işlenecek mesaj adeti verebiliyoruz. Servise 1 dk içerisnde 1000 request yapabileceğimizi varsayarak düzenlemeyi yapıyorum.

Geliştirme sonrası DemandManagement.Thirdpary.Service içerisinde program.cs main methodu içeriğinin son hali aşağıdaki gibidir.

DemandManagement.Thirdpary.Service program.cs main

Sonuç

Bir şekilde yolumuz message broker’lar ile kesiştiyse ve .Net Environment içerisinde çalışıyor isek karşılacağımız problemleri yönetmek ve kullanım kolaylığı açısından MassTransit iyi bir ESB(enterprise service bus) alternatifi olarak karşımıza çıkıyor.

Projenin kodlarına buradan ulaşabilirsiniz.

Umarım faydalı olmuştur. Bir sonraki makalede görüşmek üzere, sağlıcakla kalın.

Kaynaklar

--

--