Dibuat untuk waktu nyata: Perpesanan data besar dengan Apache Kafka, Bagian 1

Saat pergerakan big data dimulai, sebagian besar difokuskan pada pemrosesan batch. Penyimpanan data terdistribusi dan alat kueri seperti MapReduce, Hive, dan Pig semuanya dirancang untuk memproses data dalam batch daripada terus menerus. Bisnis akan menjalankan banyak pekerjaan setiap malam untuk mengekstrak data dari database, lalu menganalisis, mengubah, dan akhirnya menyimpan data. Baru-baru ini perusahaan telah menemukan kekuatan analisis dan pemrosesan data dan peristiwa saat terjadi , tidak hanya sekali setiap beberapa jam. Namun, sebagian besar sistem perpesanan tradisional tidak meningkatkan skala untuk menangani data besar secara realtime. Jadi para insinyur di LinkedIn membangun dan open-source Apache Kafka: kerangka kerja perpesanan terdistribusi yang memenuhi permintaan data besar dengan menskalakan perangkat keras komoditas.

Selama beberapa tahun terakhir, Apache Kafka telah muncul untuk memecahkan berbagai kasus penggunaan. Dalam kasus yang paling sederhana, ini bisa menjadi buffer sederhana untuk menyimpan log aplikasi. Dikombinasikan dengan teknologi seperti Spark Streaming, ini dapat digunakan untuk melacak perubahan data dan mengambil tindakan pada data tersebut sebelum menyimpannya ke tujuan akhir. Mode prediktif Kafka menjadikannya alat yang ampuh untuk mendeteksi penipuan, seperti memeriksa validitas transaksi kartu kredit saat itu terjadi, dan tidak menunggu pemrosesan batch beberapa jam kemudian.

Tutorial dua bagian ini memperkenalkan Kafka, dimulai dengan cara menginstal dan menjalankannya di lingkungan pengembangan Anda. Anda akan mendapatkan gambaran umum tentang arsitektur Kafka, diikuti dengan pengenalan untuk mengembangkan sistem perpesanan Apache Kafka yang out-of-the-box. Terakhir, Anda akan membangun aplikasi produsen / konsumen khusus yang mengirim dan menggunakan pesan melalui server Kafka. Di paruh kedua tutorial, Anda akan mempelajari cara mempartisi dan mengelompokkan pesan, dan cara mengontrol pesan mana yang akan dikonsumsi oleh konsumen Kafka.

Apa itu Apache Kafka?

Apache Kafka adalah sistem perpesanan yang dibuat untuk menyesuaikan skala data besar. Mirip dengan Apache ActiveMQ atau RabbitMq, Kafka memungkinkan aplikasi yang dibangun pada platform berbeda untuk berkomunikasi melalui penyampaian pesan asinkron. Tetapi Kafka berbeda dari sistem perpesanan yang lebih tradisional ini dalam beberapa hal utama:

  • Ini dirancang untuk menskalakan secara horizontal, dengan menambahkan lebih banyak server komoditas.
  • Ini memberikan hasil yang jauh lebih tinggi untuk proses produsen dan konsumen.
  • Ini dapat digunakan untuk mendukung kasus penggunaan batch dan real-time.
  • Itu tidak mendukung JMS, API middleware berorientasi pesan Java.

Arsitektur Apache Kafka

Sebelum kita menjelajahi arsitektur Kafka, Anda harus mengetahui terminologi dasarnya:

  • Sebuah produsen adalah proses yang dapat mempublikasikan pesan ke topik.
  • a konsumen adalah proses yang dapat berlangganan satu atau lebih topik dan mengkonsumsi pesan dipublikasikan ke topik.
  • Sebuah kategori topik adalah nama dari pakan yang pesan diterbitkan.
  • Sebuah broker adalah proses yang berjalan pada mesin tunggal.
  • Sebuah klaster adalah sekelompok broker bekerja sama.

Arsitektur Apache Kafka sangat sederhana, yang dapat menghasilkan kinerja dan throughput yang lebih baik di beberapa sistem. Setiap topik di Kafka seperti file log sederhana. Saat produser menerbitkan pesan, server Kafka menambahkannya ke akhir file log untuk topik tertentu. Server juga memberikan offset , yaitu angka yang digunakan untuk mengidentifikasi setiap pesan secara permanen. Seiring bertambahnya jumlah pesan, nilai setiap offset meningkat; misalnya jika produser menerbitkan tiga pesan, yang pertama mungkin mendapatkan offset 1, yang kedua offset 2, dan yang ketiga offset 3.

Ketika konsumen Kafka pertama kali memulai, ia akan mengirim permintaan tarik ke server, meminta untuk mengambil pesan apa pun untuk topik tertentu dengan nilai offset lebih tinggi dari 0. Server akan memeriksa file log untuk topik itu dan mengembalikan tiga pesan baru . Konsumen akan memproses pesan, kemudian mengirim permintaan untuk pesan dengan offset lebih tinggi dari 3, dan seterusnya.

Di Kafka, klien bertanggung jawab untuk mengingat jumlah offset dan mengambil pesan. Server Kafka tidak melacak atau mengelola konsumsi pesan. Secara default, server Kafka akan menyimpan pesan selama tujuh hari. Utas latar belakang di server memeriksa dan menghapus pesan yang berumur tujuh hari atau lebih. Seorang konsumen dapat mengakses pesan selama mereka berada di server. Itu dapat membaca pesan beberapa kali, dan bahkan membaca pesan dalam urutan penerimaan terbalik. Tetapi jika konsumen gagal untuk mengambil pesan sebelum tujuh hari habis, itu akan melewatkan pesan itu.

Tolok ukur Kafka

Penggunaan produksi oleh LinkedIn dan perusahaan lain telah menunjukkan bahwa dengan konfigurasi yang tepat, Apache Kafka mampu memproses ratusan gigabyte data setiap hari. Pada tahun 2011, tiga insinyur LinkedIn menggunakan pengujian benchmark untuk menunjukkan bahwa Kafka dapat mencapai throughput yang jauh lebih tinggi daripada ActiveMQ dan RabbitMQ.

Penyiapan dan demo cepat Apache Kafka

Kami akan membuat aplikasi khusus dalam tutorial ini, tetapi mari kita mulai dengan menginstal dan menguji instance Kafka dengan produsen dan konsumen out-of-the-box.

  1. Kunjungi halaman unduh Kafka untuk menginstal versi terbaru (0,9 saat tulisan ini dibuat).
  2. Ekstrak binari ke dalam software/kafkafolder. Untuk versi saat ini software/kafka_2.11-0.9.0.0.
  3. Ubah direktori Anda saat ini untuk menunjuk ke folder baru.
  4. Mulai server Zookeeper dengan mengeksekusi perintah: bin/zookeeper-server-start.sh config/zookeeper.properties.
  5. Mulai server Kafka dengan mengeksekusi: bin/kafka-server-start.sh config/server.properties.
  6. Membuat topik tes yang dapat Anda gunakan untuk pengujian: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Mulai konsol konsumen sederhana yang dapat mengkonsumsi pesan dipublikasikan ke suatu topik tertentu, seperti javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning.
  8. Memulai sebuah konsol produser sederhana yang dapat mempublikasikan pesan ke topik uji: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. Coba ketikkan satu atau dua pesan ke konsol produser. Pesan Anda harus ditampilkan di konsol konsumen.

Contoh aplikasi dengan Apache Kafka

Anda telah melihat bagaimana Apache Kafka bekerja di luar kotak. Selanjutnya, mari kembangkan aplikasi produsen / konsumen khusus. Produser akan mengambil masukan pengguna dari konsol dan mengirim setiap baris baru sebagai pesan ke server Kafka. Konsumen akan mengambil pesan untuk topik tertentu dan mencetaknya ke konsol. Komponen produsen dan konsumen dalam hal ini adalah implementasi Anda sendiri dari kafka-console-producer.shdan kafka-console-consumer.sh.

Mari kita mulai dengan membuat Producer.javakelas. Kelas klien ini berisi logika untuk membaca masukan pengguna dari konsol dan mengirim masukan tersebut sebagai pesan ke server Kafka.

Kami mengkonfigurasi produser dengan membuat objek dari java.util.Propertieskelas dan mengatur propertinya. Kelas ProducerConfig mendefinisikan semua properti berbeda yang tersedia, tetapi nilai default Kafka cukup untuk sebagian besar penggunaan. Untuk konfigurasi default kita hanya perlu mengatur tiga properti wajib:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

Custom key/value objects

Similar to StringSerializer, Kafka provides serializers for other primitives such as int and long. In order to use a custom object for our key or value, we would need to create a class implementing org.apache.kafka.common.serialization.Serializer. We could then add logic to serialize the class into byte[]. We would also have to use a corresponding deserializer in our consumer code.

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, line); producer.send(rec); line = in.nextLine(); } in.close(); producer.close(); } } 

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

Dalam kasus aplikasi contoh, kita tahu produser menggunakan ByteArraySerializeruntuk kunci dan StringSerializernilainya. Oleh karena itu, di sisi klien kita perlu menggunakan org.apache.kafka.common.serialization.ByteArrayDeserializeruntuk kunci dan org.apache.kafka.common.serialization.StringDeserializernilainya. Menyetel kelas-kelas itu sebagai nilai untuk KEY_DESERIALIZER_CLASS_CONFIGdan VALUE_DESERIALIZER_CLASS_CONFIGakan memungkinkan konsumen untuk menghentikan byte[]jenis penyandian yang dikirim oleh produsen.

Terakhir, kita perlu mengatur nilai file GROUP_ID_CONFIG. Ini harus menjadi nama grup dalam format string. Saya akan menjelaskan lebih lanjut tentang konfigurasi ini sebentar lagi. Untuk saat ini, lihat saja konsumen Kafka dengan empat set properti wajib: