Cara menggunakan Redis untuk pemrosesan streaming waktu nyata

Roshan Kumar adalah manajer produk senior di Redis Labs.

Penyerapan data streaming real-time adalah persyaratan umum untuk banyak kasus penggunaan big data. Dalam bidang-bidang seperti IoT, e-commerce, keamanan, komunikasi, hiburan, keuangan, dan ritel, di mana banyak hal bergantung pada pengambilan keputusan berdasarkan data yang tepat waktu dan akurat, pengumpulan dan analisis data secara real-time sebenarnya merupakan inti dari bisnis ini.

Namun, mengumpulkan, menyimpan, dan memproses data streaming dalam volume besar dan dengan kecepatan tinggi menghadirkan tantangan arsitektur. Langkah pertama yang penting dalam menyampaikan analisis data waktu nyata adalah memastikan bahwa jaringan, komputasi, penyimpanan, dan sumber daya memori yang memadai tersedia untuk menangkap aliran data yang cepat. Tetapi tumpukan perangkat lunak perusahaan harus sesuai dengan kinerja infrastruktur fisiknya. Jika tidak, bisnis akan menghadapi tumpukan data yang sangat besar, atau lebih buruk lagi, data yang hilang atau tidak lengkap.

Redis telah menjadi pilihan populer untuk skenario penyerapan data yang cepat. Platform database dalam memori yang ringan, Redis mencapai throughput dalam jutaan operasi per detik dengan latensi sub-milidetik, sambil menggunakan sumber daya minimal. Ia juga menawarkan implementasi sederhana, yang dimungkinkan oleh berbagai struktur data dan fungsinya.

Dalam artikel ini, saya akan menunjukkan bagaimana Redis Enterprise dapat menyelesaikan tantangan umum yang terkait dengan konsumsi dan pemrosesan data kecepatan tinggi dalam jumlah besar. Kami akan membahas tiga pendekatan berbeda (termasuk kode) untuk memproses umpan Twitter secara real time, masing-masing menggunakan Redis Pub / Sub, Redis List, dan Redis Sorted Sets. Seperti yang akan kita lihat, ketiga metode memiliki peran untuk dimainkan dalam penyerapan data yang cepat, bergantung pada kasus penggunaan.

Tantangan dalam merancang solusi penyerapan data yang cepat

Penyerapan data berkecepatan tinggi sering kali melibatkan beberapa jenis kompleksitas:

  • Data dalam jumlah besar terkadang tiba-tiba tiba-tiba. Data Bursty membutuhkan solusi yang mampu memproses data dalam jumlah besar dengan latensi minimal. Idealnya, itu harus mampu melakukan jutaan penulisan per detik dengan latensi sub-milidetik, menggunakan sumber daya yang minimal.
  • Data dari berbagai sumber. Solusi penyerapan data harus cukup fleksibel untuk menangani data dalam berbagai format, mempertahankan identitas sumber jika diperlukan, dan mengubah atau menormalkan secara real-time.
  • Data yang perlu disaring, dianalisis, atau diteruskan. Sebagian besar solusi penyerapan data memiliki satu atau beberapa pelanggan yang menggunakan data tersebut. Ini sering kali merupakan aplikasi berbeda yang berfungsi di lokasi yang sama atau berbeda dengan serangkaian asumsi yang bervariasi. Dalam kasus seperti itu, database tidak hanya perlu mengubah data, tetapi juga memfilter atau menggabungkan tergantung pada persyaratan aplikasi yang memakan.
  • Data berasal dari sumber yang tersebar secara geografis. Dalam skenario ini, seringkali mudah untuk mendistribusikan node pengumpulan data, menempatkannya dekat dengan sumber. Node itu sendiri menjadi bagian dari solusi penyerapan data yang cepat, untuk mengumpulkan, memproses, meneruskan, atau mengubah rute data.

Menangani penyerapan data cepat di Redis

Banyak solusi yang mendukung penyerapan data cepat saat ini bersifat kompleks, kaya fitur, dan direkayasa secara berlebihan untuk kebutuhan sederhana. Redis, di sisi lain, sangat ringan, cepat, dan mudah digunakan. Dengan klien yang tersedia dalam lebih dari 60 bahasa, Redis dapat dengan mudah diintegrasikan dengan tumpukan perangkat lunak populer.

Redis menawarkan struktur data seperti List, Sets, Sorted Sets, dan Hash yang menawarkan pemrosesan data yang sederhana dan serbaguna. Redis memberikan lebih dari satu juta operasi baca / tulis per detik, dengan latensi sub-milidetik pada instance cloud komoditas berukuran sedang, menjadikannya sangat hemat sumber daya untuk volume data yang besar. Redis juga mendukung layanan perpesanan dan pustaka klien dalam semua bahasa pemrograman populer, membuatnya sangat sesuai untuk menggabungkan penyerapan data berkecepatan tinggi dan analitik waktu nyata. Perintah Redis Pub / Sub memungkinkannya untuk memainkan peran sebagai perantara pesan antara penerbit dan pelanggan, fitur yang sering digunakan untuk mengirim pemberitahuan atau pesan antara node penyerapan data terdistribusi.

Redis Enterprise menyempurnakan Redis dengan penskalaan yang mulus, ketersediaan yang selalu aktif, penerapan otomatis, dan kemampuan untuk menggunakan memori flash yang hemat biaya sebagai perluasan RAM sehingga pemrosesan kumpulan data besar dapat dilakukan dengan biaya yang hemat.

Pada bagian di bawah ini, saya akan menjelaskan cara menggunakan Redis Enterprise untuk mengatasi tantangan penyerapan data umum.

Redis dengan kecepatan Twitter

Untuk menggambarkan kesederhanaan Redis, kita akan mempelajari contoh solusi penyerapan data cepat yang mengumpulkan pesan dari umpan Twitter. Tujuan dari solusi ini adalah untuk memproses tweet secara real-time dan mendorongnya ke bawah pipa saat diproses.

Data Twitter yang diserap oleh solusi tersebut kemudian dikonsumsi oleh beberapa prosesor di masa mendatang. Seperti yang ditunjukkan pada Gambar 1, contoh ini berkaitan dengan dua prosesor - Prosesor Tweet Bahasa Inggris dan Prosesor Influencer. Setiap prosesor menyaring tweet dan meneruskannya ke saluran masing-masing ke konsumen lain. Rantai ini dapat mencapai solusi yang dibutuhkan. Namun, dalam contoh kami, kami berhenti di tingkat ketiga, di mana kami mengumpulkan diskusi populer di antara penutur bahasa Inggris dan pemberi pengaruh teratas.

Redis Labs

Perhatikan bahwa kami menggunakan contoh pemrosesan umpan Twitter karena kecepatan kedatangan data dan kesederhanaan. Perhatikan juga bahwa data Twitter mencapai penyerapan data cepat kami melalui satu saluran. Dalam banyak kasus, seperti IoT, mungkin ada beberapa sumber data yang mengirimkan data ke penerima utama.

Ada tiga cara yang mungkin untuk mengimplementasikan solusi ini menggunakan Redis: serap dengan Redis Pub / Sub, serap dengan struktur data Daftar, atau serap dengan struktur data Sorted Set. Mari kita periksa masing-masing opsi ini.

Serap dengan Redis Pub / Sub

Ini adalah implementasi paling sederhana dari penyerapan data cepat. Solusi ini menggunakan fitur Redis's Pub / Sub, yang memungkinkan aplikasi untuk menerbitkan dan berlangganan pesan. Seperti yang ditunjukkan pada Gambar 2, setiap tahap memproses data dan menerbitkannya ke saluran. Tahap berikutnya berlangganan ke saluran dan menerima pesan untuk diproses atau disaring lebih lanjut.

Redis Labs

Pro

  • Mudah diimplementasikan.
  • Berfungsi dengan baik ketika sumber data dan pemroses didistribusikan secara geografis.

Kontra 

  • Solusinya mengharuskan penerbit dan pelanggan selalu siap. Pelanggan kehilangan data saat dihentikan, atau saat koneksi terputus.
  • Ini membutuhkan lebih banyak koneksi. Sebuah program tidak dapat menerbitkan dan berlangganan ke koneksi yang sama, jadi setiap pemroses data perantara memerlukan dua koneksi - satu untuk berlangganan dan satu lagi untuk menerbitkan. Jika menjalankan Redis pada platform DBaaS, penting untuk memverifikasi apakah paket atau tingkat layanan Anda memiliki batasan jumlah koneksi.

Catatan tentang koneksi

Jika lebih dari satu klien berlangganan saluran, Redis mendorong data ke setiap klien secara linier, satu demi satu. Payload data yang besar dan banyak koneksi dapat menyebabkan latensi antara penerbit dan pelanggannya. Meskipun batas keras default untuk jumlah koneksi maksimum adalah 10.000, Anda harus menguji dan mengukur berapa banyak koneksi yang sesuai untuk payload Anda.

Redis mempertahankan buffer keluaran klien untuk setiap klien. Batas default untuk buffer keluaran klien untuk Pub / Sub ditetapkan sebagai:

pubsub client-output-buffer-limit 32mb 8mb 60

Dengan setelan ini, Redis akan memaksa klien untuk memutuskan sambungan dalam dua kondisi: jika buffer keluaran tumbuh melebihi 32 MB, atau jika buffer keluaran menampung 8 MB data secara konsisten selama 60 detik.

Ini adalah indikasi bahwa klien mengonsumsi data lebih lambat daripada dipublikasikan. Jika situasi seperti itu muncul, pertama-tama coba optimalkan konsumen sehingga mereka tidak menambahkan latensi saat mengonsumsi data. Jika Anda melihat bahwa klien Anda masih terputus, Anda dapat meningkatkan batas untuk client-output-buffer-limit pubsubproperti di redis.conf. Harap diingat bahwa setiap perubahan pada pengaturan dapat meningkatkan latensi antara penerbit dan pelanggan. Setiap perubahan harus diuji dan diverifikasi secara menyeluruh.

Desain kode untuk solusi Redis Pub / Sub

Redis Labs

Ini adalah yang paling sederhana dari tiga solusi yang dijelaskan dalam makalah ini. Berikut adalah kelas Java penting yang diimplementasikan untuk solusi ini. Unduh kode sumber dengan implementasi penuh di sini: //github.com/redislabsdemo/IngestPubSub.

The Subscriberkelas kelas inti dari desain ini. Setiap Subscriberobjek mempertahankan koneksi baru dengan Redis.

class Subscriber extends JedisPubSub implementasikan Runnable {

       nama String pribadi;

       private RedisConnection conn = null;

       pribadi Jedis jedis = null;

       private String subscriberChannel;

       Public Subscriber (String subscriberName, String channelName) melempar Exception {

              name = subscriberName;

              subscriberChannel = channelName;

              Thread t = Thread baru (ini);

              t.start ();

       }

       @Mengesampingkan

       public void run () {

              mencoba{

                      conn = RedisConnection.getRedisConnection ();

                      jedis = conn.getJedis ();

                      sementara (benar) {

                             jedis.subscribe (this, this.subscriberChannel);

                      }

              } catch (Exception e) {

                      e.printStackTrace ();

              }

       }

       @Mengesampingkan

       public void onMessage (saluran String, pesan String) {

              super.onMessage (saluran, pesan);

       }

}

The Publisherkelas mempertahankan koneksi terpisah untuk Redis untuk menerbitkan pesan ke saluran.

Public class Publisher {

       Koneksi RedisConnection = null;

       Jedis jedis = null;

       saluran String pribadi;

       public Publisher (String channelName) melempar Exception {

              channel = channelName;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();

       }

       public void publish (String msg) melempar Exception {

              jedis.publish (channel, msg);

       }

}

The EnglishTweetFilter, InfluencerTweetFilter, HashTagCollector, dan InfluencerCollectorfilter memperpanjang Subscriber, yang memungkinkan mereka untuk mendengarkan saluran masuk. Karena Anda memerlukan koneksi Redis terpisah untuk berlangganan dan menerbitkan, setiap kelas filter memiliki RedisConnectionobjeknya sendiri . Filter mendengarkan pesan baru di saluran mereka dalam satu putaran. Berikut ini contoh kode EnglishTweetFilterkelas:

public class EnglishTweetFilter extends Subscriber

{

       private RedisConnection conn = null;

       pribadi Jedis jedis = null; 

       private String publisherChannel = null;

public EnglishTweetFilter (String name, String subscriberChannel, String publisherChannel) melempar Exception {

              super (name, subscriberChannel);

              this.publisherChannel = publisherChannel;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();           

       }

       @Mengesampingkan

       public void onMessage (String subscriberChannel, String message) {

              JsonParser jsonParser = new JsonParser ();

              JsonElement jsonElement = jsonParser.parse (pesan);

              JsonObject jsonObject = jsonElement.getAsJsonObject ();

              // filter pesan: publikasikan tweet berbahasa Inggris saja           

if (jsonObject.get (“lang”)! = null &&

       jsonObject.get ("lang"). getAsString (). sama dengan ("en")) {

                      jedis.publish (publisherChannel, message);

              }

       }

}

The Publisherkelas memiliki mempublikasikan metode yang menerbitkan pesan ke saluran yang diperlukan.

Public class Publisher {

.

.     

       public void publish (String msg) melempar Exception {

              jedis.publish (channel, msg);

       }

.

}

Kelas utama membaca data dari aliran serapan dan mempostingnya ke AllDatasaluran. Metode utama kelas ini memulai semua objek filter.

public class IngestPubSub

{

.

       public void start () melempar Exception {

       .

       .

              penerbit = Penerbit baru ("AllData");

              englishFilter = new EnglishTweetFilter ("Filter Bahasa Inggris", "AllData",

                                           “EnglishTweets”);

              influencerFilter = InfluencerTweetFilter baru ("Filter Influencer",

                                           “AllData”, “InfluencerTweets”);

              hashtagCollector = HashTagCollector baru (“Hashtag Collector”, 

                                           “EnglishTweets”);

              influencerCollector = InfluencerCollector baru (“Influencer Collector”,

                                           “InfluencerTweets”);

       .

       .

}

Serap dengan Daftar Redis

Struktur data Daftar di Redis membuat penerapan solusi antrian menjadi mudah dan lugas. Dalam solusi ini, produser mendorong setiap pesan ke belakang antrian, dan pelanggan mengumpulkan antrian dan menarik pesan baru dari ujung yang lain.

Redis Labs

Pro

  • Metode ini dapat diandalkan jika terjadi kehilangan koneksi. Setelah dimasukkan ke dalam daftar, data disimpan di sana sampai pelanggan membacanya. Ini berlaku bahkan jika pelanggan dihentikan atau kehilangan koneksi mereka dengan server Redis.
  • Produsen dan konsumen tidak membutuhkan koneksi di antara mereka.

Kontra

  • Setelah data ditarik dari daftar, itu dihapus dan tidak dapat diambil lagi. Kecuali jika konsumen mempertahankan data, data tersebut akan hilang segera setelah dikonsumsi.
  • Setiap konsumen membutuhkan antrian terpisah, yang membutuhkan penyimpanan banyak salinan data.

Desain kode untuk solusi Daftar Redis

Redis Labs

Anda dapat mengunduh kode sumber untuk solusi Daftar Redis di sini: //github.com/redislabsdemo/IngestList. Kelas utama solusi ini dijelaskan di bawah.

MessageListmenyematkan struktur data Daftar Redis. The push()Metode mendorong pesan baru di sebelah kiri antrian, dan pop()menunggu untuk pesan baru dari kanan jika antrian kosong.

public class MessageList {

       dilindungi Nama string = "Daftar Saya"; // Nama

.

.     

       public void push (String msg) melempar Exception {

              jedis.lpush (nama, pesan); // Dorong Kiri

       }

       public String pop () melempar Exception {

              return jedis.brpop (0, name) .toString ();

       }

.

.

}

MessageListeneradalah kelas abstrak yang menerapkan logika pendengar dan penerbit. Sebuah MessageListenerobjek mendengarkan hanya satu daftar, tetapi dapat menerbitkan ke beberapa saluran ( MessageFilterobjek). Solusi ini membutuhkan MessageFilterobjek terpisah untuk setiap pelanggan di pipa.

class MessageListener mengimplementasikan Runnable {

       private String name = null;

       private MessageList inboundList = null;

       Map outBoundMsgFilters = HashMap baru ();

.

.     

       public void registerOutBoundMessageList (MessageFilter msgFilter) {

              if (msgFilter! = null) {

                      if (outBoundMsgFilters.get (msgFilter.name) == null) {

                             outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

       @Mengesampingkan

       public void run () {

.

                      sementara (benar) {

                             String msg = inboundList.pop ();

                             processMessage (msg);

                      }                                  

.

       }

.

       void dilindungi pushMessage (String msg) memunculkan Exception {

              Set outBoundMsgNames = outBoundMsgFilters.keySet ();

              untuk (Nama string: outBoundMsgNames) {

                      MessageFilter msgList = outBoundMsgFilters.get (nama);

                      msgList.filterAndPush (msg);

              }

       }

}

MessageFilteradalah kelas orang tua yang memfasilitasi filterAndPush()metode tersebut. Saat data mengalir melalui sistem penyerapan, data sering difilter atau diubah sebelum dikirim ke tahap berikutnya. Kelas yang memperluas MessageFilterkelas akan mengganti filterAndPush()metode, dan menerapkan logikanya sendiri untuk mendorong pesan yang difilter ke daftar berikutnya.

public class MessageFilter {

       MessageList messageList = null;

.

.

       public void filterAndPush (String msg) melempar Exception {

              messageList.push (msg);

       }

.

.     

}

AllTweetsListeneradalah contoh implementasi MessageListenerkelas. Ini mendengarkan semua tweet di AllDatasaluran, dan menerbitkan data ke EnglishTweetsFilterdan InfluencerFilter.

kelas publik AllTweetsListener memperluas MessageListener {

.

.     

       public static void main (String [] args) melempar Exception {

              MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (baru

              EnglishTweetsFilter (“EnglishTweetsFilter”, “EnglishTweets”));

              allTweetsProcessor.registerOutBoundMessageList (baru

                             InfluencerFilter (“InfluencerFilter”, “Influencer”));

              allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFiltermeluas MessageFilter. Kelas ini menerapkan logika untuk memilih hanya tweet yang ditandai sebagai tweet berbahasa Inggris. Filter membuang tweet non-bahasa Inggris dan mendorong tweet berbahasa Inggris ke daftar berikutnya.

public class EnglishTweetsFilter memperluas MessageFilter {

       public EnglishTweetsFilter (String name, String listName) melempar Exception {

              super (nama, listName);

       }

       @Mengesampingkan

       public void filterAndPush (String message) melontarkan Exception {

              JsonParser jsonParser = new JsonParser ();

              JsonElement jsonElement = jsonParser.parse (pesan);

              JsonArray jsonArray = jsonElement.getAsJsonArray ();

              JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

              if (jsonObject.get (“lang”)! = null &&

jsonObject.get ("lang"). getAsString (). sama dengan ("en")) {

                             Jedis jedis = super.getJedisInstance ();

                             jika (jedis! = null) {

                                    jedis.lpush (super.name, jsonObject.toString ());

                             }

              }

       }

}