Dibuat untuk waktu nyata: Perpesanan data besar dengan Apache Kafka, Bagian 2

Di paruh pertama pengenalan JavaWorld ke Apache Kafka, Anda mengembangkan beberapa aplikasi produsen / konsumen skala kecil menggunakan Kafka. Dari latihan ini, Anda harus terbiasa dengan dasar-dasar sistem pesan Apache Kafka. Di paruh kedua ini, Anda akan mempelajari cara menggunakan partisi untuk mendistribusikan beban dan menskalakan aplikasi Anda secara horizontal, menangani hingga jutaan pesan per hari. Anda juga akan mempelajari bagaimana Kafka menggunakan penyeimbangan pesan untuk melacak dan mengelola pemrosesan pesan yang kompleks, dan bagaimana melindungi sistem perpesanan Apache Kafka Anda dari kegagalan jika konsumen turun. Kami akan mengembangkan aplikasi contoh dari Bagian 1 untuk kasus penggunaan publish-subscribe dan point-to-point.

Partisi di Apache Kafka

Topik di Kafka dapat dibagi lagi menjadi beberapa partisi. Misalnya, saat membuat topik bernama Demo, Anda mungkin mengkonfigurasinya untuk memiliki tiga partisi. Server akan membuat tiga file log, satu untuk setiap partisi demo. Ketika produser menerbitkan pesan ke topik, itu akan menetapkan ID partisi untuk pesan itu. Server kemudian akan menambahkan pesan ke file log untuk partisi itu saja.

Jika Anda kemudian memulai dua konsumen, server mungkin menetapkan partisi 1 dan 2 ke konsumen pertama, dan partisi 3 ke konsumen kedua. Setiap konsumen hanya akan membaca dari partisi yang ditetapkan. Anda dapat melihat topik Demo yang dikonfigurasi untuk tiga partisi pada Gambar 1.

Untuk memperluas skenario, bayangkan cluster Kafka dengan dua broker, bertempat di dua mesin. Ketika Anda mempartisi topik demo, Anda akan mengkonfigurasinya untuk memiliki dua partisi dan dua replika. Untuk jenis konfigurasi ini, server Kafka akan menetapkan dua partisi ke dua broker di cluster Anda. Setiap pialang akan menjadi pemimpin untuk salah satu partisi.

Ketika seorang produser menerbitkan pesan, itu akan pergi ke pemimpin partisi. Pemimpin akan mengambil pesan dan menambahkannya ke file log di mesin lokal. Pialang kedua akan secara pasif mereplikasi log komit itu ke mesinnya sendiri. Jika pemimpin partisi turun, broker kedua akan menjadi pemimpin baru dan mulai melayani permintaan klien. Dengan cara yang sama, ketika konsumen mengirim permintaan ke sebuah partisi, permintaan itu akan pergi dulu ke pemimpin partisi, yang akan mengembalikan pesan yang diminta.

Manfaat partisi

Pertimbangkan manfaat mempartisi sistem pesan berbasis Kafka:

  1. Skalabilitas : Dalam sistem dengan hanya satu partisi, pesan yang diterbitkan ke topik disimpan dalam file log, yang ada di satu mesin. Jumlah pesan untuk suatu topik harus sesuai dengan satu file log komit, dan ukuran pesan yang disimpan tidak boleh lebih dari ruang disk mesin itu. Mempartisi topik memungkinkan Anda menskalakan sistem Anda dengan menyimpan pesan pada mesin yang berbeda dalam sebuah cluster. Jika Anda ingin menyimpan 30 gigabyte (GB) pesan untuk topik Demo, misalnya, Anda dapat membuat cluster Kafka dari tiga mesin, masing-masing dengan 10 GB ruang disk. Kemudian Anda akan mengkonfigurasi topik agar memiliki tiga partisi.
  2. Penyeimbangan beban server : Memiliki banyak partisi memungkinkan Anda menyebarkan permintaan pesan ke seluruh broker. Misalnya, Jika Anda memiliki topik yang memproses 1 juta pesan per detik, Anda dapat membaginya menjadi 100 partisi dan menambahkan 100 broker ke cluster Anda. Setiap broker akan menjadi pemimpin untuk partisi tunggal, bertanggung jawab untuk menanggapi hanya 10.000 permintaan klien per detik.
  3. Penyeimbangan muatan konsumen : Mirip dengan penyeimbangan muatan server, menghosting banyak konsumen pada mesin yang berbeda memungkinkan Anda menyebarkan muatan konsumen. Katakanlah Anda ingin mengonsumsi 1 juta pesan per detik dari sebuah topik dengan 100 partisi. Anda dapat membuat 100 konsumen dan menjalankannya secara paralel. Server Kafka akan menetapkan satu partisi untuk setiap konsumen, dan setiap konsumen akan memproses 10.000 pesan secara paralel. Karena Kafka menetapkan setiap partisi hanya untuk satu konsumen, di dalam partisi setiap pesan akan dikonsumsi secara berurutan.

Dua cara untuk mempartisi

Produser bertanggung jawab untuk memutuskan partisi mana yang akan dituju pesan. Produser memiliki dua opsi untuk mengontrol tugas ini:

  • Pemartisi kustom : Anda dapat membuat kelas yang mengimplementasikan org.apache.kafka.clients.producer.Partitionerantarmuka. Kustom ini Partitionerakan menerapkan logika bisnis untuk memutuskan ke mana pesan dikirim.
  • DefaultPartitioner : Jika Anda tidak membuat kelas pemartisi kustom, maka secara default org.apache.kafka.clients.producer.internals.DefaultPartitionerkelas tersebut akan digunakan. Pemartisi default cukup baik untuk banyak kasus, menyediakan tiga opsi:
    1. Manual : Saat Anda membuat ProducerRecord, gunakan konstruktor kelebihan beban new ProducerRecord(topicName, partitionId,messageKey,message)untuk menentukan ID partisi.
    2. Hashing (Lokalitas sensitif) : Saat Anda membuat ProducerRecord, tentukan a messageKey, dengan memanggil new ProducerRecord(topicName,messageKey,message). DefaultPartitionerakan menggunakan hash kunci untuk memastikan bahwa semua pesan untuk kunci yang sama dikirim ke produser yang sama. Ini adalah pendekatan termudah dan paling umum.
    3. Penyemprotan (Random Load Balancing) : Jika Anda tidak ingin mengontrol ke mana pesan partisi, cukup panggil new ProducerRecord(topicName, message)untuk membuat ProducerRecord. Dalam hal ini, pemartisi akan mengirim pesan ke semua partisi dengan cara round-robin, memastikan pemuatan server yang seimbang.

Mempartisi aplikasi Apache Kafka

Untuk contoh produsen / konsumen sederhana di Bagian 1, kami menggunakan a DefaultPartitioner. Sekarang kami akan mencoba membuat pemartisi khusus sebagai gantinya. Untuk contoh ini, anggap saja kita memiliki situs retail yang dapat digunakan konsumen untuk memesan produk di mana pun di dunia. Berdasarkan penggunaan, kami mengetahui bahwa sebagian besar konsumen berada di Amerika Serikat atau India. Kami ingin mempartisi aplikasi kami untuk mengirim pesanan dari AS atau India ke konsumen masing-masing, sementara pesanan dari mana saja akan dikirim ke konsumen ketiga.

Untuk memulai, kami akan membuat CountryPartitioneryang mengimplementasikan org.apache.kafka.clients.producer.Partitionerantarmuka. Kita harus menerapkan metode berikut:

  1. Kafka akan memanggil konfigurasi () ketika kita menginisialisasi Partitionerkelas, dengan Mapproperti konfigurasi. Metode ini menginisialisasi fungsi khusus untuk logika bisnis aplikasi, seperti menghubungkan ke database. Dalam hal ini kami menginginkan pemartisi yang cukup umum yang digunakan countryNamesebagai properti. Kami kemudian dapat menggunakan configProperties.put("partitions.0","USA")untuk memetakan aliran pesan ke partisi. Nanti kita bisa menggunakan format ini untuk mengubah negara mana yang mendapatkan partisi sendiri.
  2. The ProducerAPI panggilan partisi () sekali untuk setiap pesan. Dalam hal ini kami akan menggunakannya untuk membaca pesan dan mengurai nama negara dari pesan tersebut. Jika nama negara di countryToPartitionMap, itu akan kembali partitionIddisimpan di Map. Jika tidak, itu akan mencirikan nilai negara dan menggunakannya untuk menghitung partisi mana yang harus dituju.
  3. Kami memanggil close () untuk mematikan pemartisi. Menggunakan metode ini memastikan bahwa sumber daya yang diperoleh selama inisialisasi dibersihkan selama pematian.

Perhatikan bahwa saat Kafka memanggil configure(), produser Kafka akan meneruskan semua properti yang telah kita konfigurasikan untuk produser ke Partitionerkelas. Sangat penting bahwa kita hanya membaca properti yang dimulai dengan partitions., mengurai untuk mendapatkan partitionId, dan menyimpan ID countryToPartitionMap.

Di bawah ini adalah implementasi kustom Partitionerantarmuka kami.

Daftar 1. CountryPartitioner

 public class CountryPartitioner implements Partitioner { private static Map countryToPartitionMap; public void configure(Map configs) { System.out.println("Inside CountryPartitioner.configure " + configs); countryToPartitionMap = new HashMap(); for(Map.Entry entry: configs.entrySet()){ if(entry.getKey().startsWith("partitions.")){ String keyName = entry.getKey(); String value = (String)entry.getValue(); System.out.println( keyName.substring(11)); int paritionId = Integer.parseInt(keyName.substring(11)); countryToPartitionMap.put(value,paritionId); } } } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.availablePartitionsForTopic(topic); String valueStr = (String)value; String countryName = ((String) value).split(":")[0]; if(countryToPartitionMap.containsKey(countryName)){ //If the country is mapped to particular partition return it return countryToPartitionMap.get(countryName); }else { //If no country is mapped to particular partition distribute between remaining partitions int noOfPartitions = cluster.topics().size(); return value.hashCode()%noOfPartitions + countryToPartitionMap.size() ; } } public void close() {} } 

The Producerkelas Listing 2 (bawah) sangat mirip dengan produser sederhana kami dari Bagian 1, dengan dua perubahan ditandai dengan huruf tebal:

  1. Kami menetapkan properti config dengan kunci yang sama dengan nilai ProducerConfig.PARTITIONER_CLASS_CONFIG, yang cocok dengan nama CountryPartitionerkelas kami yang memenuhi syarat . Kami juga mengatur countryNameke partitionId, sehingga memetakan properti yang ingin kami teruskan CountryPartitioner.
  2. We pass an instance of a class implementing the org.apache.kafka.clients.producer.Callback interface as a second argument to the producer.send() method. The Kafka client will call its onCompletion() method once a message is successfully published, attaching a RecordMetadata object. We'll be able to use this object to find out which partition a message was sent to, as well as the offset assigned to the published message.

Listing 2. A partitioned producer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");  configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName()); configProperties.put("partition.1","USA"); configProperties.put("partition.2","India");  org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, null, line); producer.send(rec, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset()); ; } }); line = in.nextLine(); } in.close(); producer.close(); } } 

Assigning partitions to consumers

The Kafka server guarantees that a partition is assigned to only one consumer, thereby guaranteeing the order of message consumption. You can manually assign a partition or have it assigned automatically.

If your business logic demands more control, then you'll need to manually assign partitions. In this case you would use KafkaConsumer.assign() to pass a list of partitions that each consumer was interested in to the Kakfa server.

Having partitions assigned automatically is the default and most common choice. In this case, the Kafka server will assign a partition to each consumer, and will reassign partitions to scale for new consumers.

Katakanlah Anda membuat topik baru dengan tiga partisi. Saat Anda memulai konsumen pertama untuk topik baru, Kafka akan menetapkan ketiga partisi ke konsumen yang sama. Jika Anda kemudian memulai konsumen kedua, Kafka akan menetapkan ulang semua partisi, menetapkan satu partisi ke konsumen pertama dan dua partisi tersisa ke konsumen kedua. Jika Anda menambahkan konsumen ketiga, Kafka akan menetapkan kembali partisi tersebut, sehingga setiap konsumen diberi satu partisi. Terakhir, jika Anda memulai konsumen keempat dan kelima, maka tiga konsumen akan memiliki partisi yang ditetapkan, tetapi yang lain tidak akan menerima pesan apa pun. Jika salah satu dari tiga partisi awal mati, Kafka akan menggunakan logika partisi yang sama untuk menetapkan kembali partisi konsumen tersebut ke salah satu konsumen tambahan.