Bagaimana membangun aplikasi streaming stateful dengan Apache Flink

Fabian Hueske adalah seorang pelaku dan anggota PMC dari proyek Apache Flink dan salah satu pendiri Data Artisans.

Apache Flink adalah kerangka kerja untuk mengimplementasikan aplikasi pemrosesan aliran stateful dan menjalankannya dalam skala besar pada cluster komputasi. Dalam artikel sebelumnya, kita telah memeriksa apa itu pemrosesan aliran stateful, kasus penggunaan apa yang ditangani, dan mengapa Anda harus mengimplementasikan dan menjalankan aplikasi streaming Anda dengan Apache Flink.

Pada artikel ini, saya akan menyajikan contoh untuk dua kasus penggunaan umum dari pemrosesan aliran stateful dan membahas bagaimana mereka dapat diimplementasikan dengan Flink. Kasus penggunaan pertama adalah aplikasi yang digerakkan oleh peristiwa, yaitu, aplikasi yang menyerap aliran peristiwa yang berkelanjutan dan menerapkan beberapa logika bisnis ke peristiwa ini. Yang kedua adalah kasus penggunaan analitik streaming, di mana saya akan menyajikan dua kueri analitik yang diterapkan dengan API SQL Flink, yang menggabungkan data streaming secara real-time. Kami di Data Artisans memberikan kode sumber dari semua contoh kami di repositori GitHub publik.

Sebelum kita menyelami detail contoh, saya akan memperkenalkan aliran acara yang diserap oleh aplikasi contoh dan menjelaskan bagaimana Anda dapat menjalankan kode yang kami sediakan.

Aliran acara naik taksi

Contoh aplikasi kami didasarkan pada kumpulan data publik tentang naik taksi yang terjadi di Kota New York pada tahun 2013. Penyelenggara DEBS (Konferensi Internasional ACM 2015 tentang Sistem Berbasis Acara Terdistribusi) Grand Challenge mengatur ulang kumpulan data asli dan mengubahnya menjadi satu file CSV tempat kita membaca sembilan bidang berikut.

  • Medali — nomor MD5 taksi
  • Hack_license — nomor MD5 dari lisensi taksi
  • Pickup_datetime — waktu saat penumpang dijemput
  • Dropoff_datetime — waktu ketika penumpang diturunkan
  • Pickup_longitude — bujur lokasi penjemputan
  • Pickup_latitude — garis lintang lokasi pengambilan
  • Dropoff_longitude — bujur lokasi penurunan
  • Dropoff_latitude — garis lintang lokasi penurunan
  • Total_amount — total yang dibayarkan dalam dolar

File CSV menyimpan catatan dalam urutan menaik dari atribut waktu pengantarannya. Karenanya, file tersebut dapat diperlakukan sebagai log peristiwa yang diurutkan yang diterbitkan saat perjalanan berakhir. Untuk menjalankan contoh yang kami berikan di GitHub, Anda perlu mengunduh kumpulan data tantangan DEBS dari Google Drive.

Semua aplikasi contoh secara berurutan membaca file CSV dan menelannya sebagai aliran acara naik taksi. Dari sana, aplikasi memproses kejadian seperti aliran lainnya, misalnya, seperti aliran yang diserap dari sistem langganan-publikasi berbasis log, seperti Apache Kafka atau Kinesis. Faktanya, membaca file (atau jenis data persisten lainnya) dan memperlakukannya sebagai aliran adalah landasan pendekatan Flink untuk menyatukan pemrosesan batch dan aliran.

Menjalankan contoh Flink

Seperti yang disebutkan sebelumnya, kami menerbitkan kode sumber aplikasi contoh kami di repositori GitHub. Kami mendorong Anda untuk bercabang dan mengkloning repositori. Contoh-contoh tersebut dapat dengan mudah dijalankan dari dalam IDE pilihan Anda; Anda tidak perlu menyiapkan dan mengkonfigurasi kluster Flink untuk menjalankannya. Pertama, impor kode sumber dari contoh sebagai proyek Maven. Kemudian, jalankan kelas utama aplikasi dan berikan lokasi penyimpanan file data (lihat link di atas untuk mendownload data) sebagai parameter program.

Setelah Anda meluncurkan aplikasi, itu akan memulai instance Flink lokal yang disematkan di dalam proses JVM aplikasi dan mengirimkan aplikasi untuk menjalankannya. Anda akan melihat banyak pernyataan log saat Flink dimulai dan tugas pekerjaan sedang dijadwalkan. Setelah aplikasi berjalan, keluarannya akan ditulis ke keluaran standar.

Membangun aplikasi berbasis peristiwa di Flink

Sekarang, mari kita bahas kasus penggunaan pertama kita, yang merupakan aplikasi berbasis peristiwa. Aplikasi berbasis peristiwa menyerap aliran peristiwa, melakukan penghitungan saat peristiwa diterima, dan mungkin mengeluarkan peristiwa baru atau memicu tindakan eksternal. Beberapa aplikasi yang digerakkan oleh peristiwa dapat disusun dengan menghubungkannya melalui sistem log peristiwa, serupa dengan bagaimana sistem besar dapat disusun dari layanan mikro. Aplikasi yang digerakkan oleh peristiwa, log peristiwa, dan snapshot status aplikasi (dikenal sebagai savepoints di Flink) terdiri dari pola desain yang sangat kuat karena Anda dapat menyetel ulang statusnya dan memutar ulang inputnya untuk memulihkan dari kegagalan, memperbaiki bug, atau untuk memigrasi aplikasi ke cluster yang berbeda.

Pada artikel ini kita akan memeriksa aplikasi berbasis peristiwa yang mendukung layanan, yang memantau jam kerja pengemudi taksi. Pada tahun 2016, Komisi Taksi dan Limusin NYC memutuskan untuk membatasi jam kerja pengemudi taksi menjadi 12 jam shift dan memerlukan istirahat setidaknya delapan jam sebelum shift berikutnya dapat dimulai. Pergeseran dimulai dengan awal perjalanan pertama. Sejak saat itu, pengemudi dapat memulai perjalanan baru dalam waktu 12 jam. Aplikasi kami melacak perjalanan pengemudi, menandai waktu berakhirnya jendela 12 jam mereka (yaitu, waktu ketika mereka dapat memulai perjalanan terakhir), dan menandai perjalanan yang melanggar peraturan. Anda dapat menemukan kode sumber lengkap dari contoh ini di repositori GitHub kami.

Aplikasi kita diimplementasikan dengan Flink's DataStream API dan a KeyedProcessFunction. API DataStream adalah API fungsional dan berdasarkan konsep aliran data yang diketik. A DataStreamadalah representasi logis dari aliran peristiwa berjenis T. Aliran diproses dengan menerapkan fungsi padanya yang menghasilkan aliran data lain, mungkin dari jenis yang berbeda. Flink memproses aliran secara paralel dengan mendistribusikan kejadian ke partisi aliran dan menerapkan contoh fungsi yang berbeda ke setiap partisi.

Cuplikan kode berikut menunjukkan aliran tingkat tinggi dari aplikasi pemantauan kami.

// menelan arus naik taksi.

Wahana DataStream = TaxiRides.getRides (env, inputPath);

Aliran data notifikasi = wahana

   // aliran partisi dengan id SIM

   .keyBy (r -> r.licenseId)

   // pantau acara berkendara dan buat notifikasi

   .process (baru MonitorWorkTime ());

// mencetak pemberitahuan

notifikasi.print ();

Aplikasi mulai menelan aliran acara naik taksi. Dalam contoh kami, acara dibaca dari file teks, diurai, dan disimpan dalam TaxiRideobjek POJO. Aplikasi dunia nyata biasanya menyerap peristiwa dari antrean pesan atau log peristiwa, seperti Apache Kafka atau Pravega. Langkah selanjutnya adalah untuk menentukan TaxiRideacara oleh licenseIdpengemudi. The keyBypartisi operasi aliran di lapangan menyatakan, sehingga semua peristiwa dengan tombol yang sama diproses oleh contoh paralel yang sama dari fungsi berikut. Dalam kasus kami, kami melakukan partisi di licenseIdlapangan karena kami ingin memantau waktu kerja masing-masing driver.

Selanjutnya, kami menerapkan MonitorWorkTimefungsi tersebut pada TaxiRideacara yang dipartisi . Fungsi ini melacak perjalanan per pengemudi dan memantau shift dan waktu istirahat mereka. Ini memancarkan peristiwa jenis Tuple2, di mana setiap tupel mewakili pemberitahuan yang terdiri dari ID lisensi pengemudi dan pesan. Akhirnya, aplikasi kita memancarkan pesan dengan mencetaknya ke keluaran standar. Aplikasi dunia nyata akan menulis pemberitahuan ke pesan eksternal atau sistem penyimpanan, seperti Apache Kafka, HDFS, atau sistem database, atau akan memicu panggilan eksternal untuk segera mendorongnya keluar.

Sekarang kita telah membahas aliran keseluruhan aplikasi, mari kita lihat MonitorWorkTimefungsinya, yang berisi sebagian besar logika bisnis aplikasi yang sebenarnya. The MonitorWorkTimefungsi adalah stateful KeyedProcessFunctionyang ingests TaxiRideperistiwa dan memancarkan Tuple2catatan. The KeyedProcessFunctionantarmuka memiliki dua metode untuk mengolah data: processElement()dan onTimer(). The processElement()metode ini disebut untuk setiap peristiwa tiba. The onTimer()metode ini disebut ketika kebakaran waktu sebelumnya terdaftar. Cuplikan berikut menunjukkan kerangka MonitorWorkTimefungsi dan semua yang dideklarasikan di luar metode pemrosesan.

publik MonitorWorkTime kelas statis

    meluas KeyedProcessFunction {

  // konstanta waktu dalam milidetik

  panjang akhir statis pribadi ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 jam

  panjang akhir statis pribadi REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 jam

  panjang akhir statis pribadi CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 jam

 formatter DateTimeFormatter transien pribadi;

  // tuas status untuk menyimpan waktu mulai shift

  ValueState shiftStart;

  @Mengesampingkan

  public void open (Konfigurasi conf) {

    // daftarkan pegangan negara

    shiftStart = getRuntimeContext (). getState (

      new ValueStateDescriptor ("shiftStart", Types.LONG));

    // menginisialisasi pemformat waktu

    this.formatter = DateTimeFormat.forPattern (“yyyy-MM-hh HH: mm: dd”);

  }

  // processElement () dan onTimer () dibahas secara rinci di bawah.

}

Fungsi tersebut mendeklarasikan beberapa konstanta untuk interval waktu dalam milidetik, pemformat waktu, dan pegangan status untuk status kunci yang dikelola oleh Flink. Status terkelola secara berkala diperiksa dan dipulihkan secara otomatis jika terjadi kegagalan. Keadaan kunci diatur per kunci, yang berarti bahwa suatu fungsi akan mempertahankan satu nilai per pegangan dan kunci. Dalam kasus kami, MonitorWorkTimefungsi tersebut mempertahankan Longnilai untuk setiap kunci, yaitu untuk masing-masing licenseId. Negara shiftStartbagian menyimpan waktu mulai shift pengemudi. Pegangan status diinisialisasi dalam open()metode, yang dipanggil satu kali sebelum kejadian pertama diproses.

Sekarang, mari kita lihat processElement()metodenya.

@Mengesampingkan

public void processElement (

    Taksi naik,

    Konteks ctx,

    Pengumpul out) melempar Exception {

  // cari waktu mulai shift terakhir

  StartTs panjang = shiftStart.value ();

  jika (startTs == null ||

    startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // ini adalah perjalanan pertama dari shift baru.

    startTs = ride.pickUpTime;

    shiftStart.update (startTs);

    long endTs = startTs + ALLOWED_WORK_TIME;

    out.collect (Tuple2.of (ride.licenseId,

      “Anda diizinkan untuk menerima penumpang baru hingga“ + formatter.print (endTs)));

    // daftarkan timer untuk membersihkan negara dalam 24 jam

    ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

  } lain jika (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

    // perjalanan ini dimulai setelah waktu kerja yang diizinkan berakhir.

    // itu melanggar peraturan!

    out.collect (Tuple2.of (ride.licenseId,

      “Perjalanan ini melanggar peraturan waktu kerja.”));

  }

}

The processElement()metode ini disebut untuk setiap TaxiRideacara. Pertama, metode ini mengambil waktu mulai shift pengemudi dari pegangan status. Jika status tidak berisi waktu mulai ( startTs == null) atau jika shift terakhir dimulai lebih dari 20 jam ( ALLOWED_WORK_TIME + REQ_BREAK_TIME) lebih awal dari perjalanan saat ini, perjalanan saat ini adalah perjalanan pertama dari shift baru. Dalam kedua kasus tersebut, fungsi memulai shift baru dengan memperbarui waktu mulai shift ke waktu mulai dari perjalanan saat ini, mengirimkan pesan kepada pengemudi dengan waktu berakhirnya shift baru, dan mendaftarkan timer untuk membersihkan menyatakan dalam 24 jam.

Jika perjalanan saat ini bukan perjalanan pertama dari shift baru, fungsi memeriksa apakah itu melanggar peraturan waktu kerja, yaitu, apakah itu dimulai lebih dari 12 jam lebih lambat dari dimulainya shift pengemudi saat ini. Jika demikian, fungsi tersebut mengeluarkan pesan untuk memberi tahu pengemudi tentang pelanggaran tersebut.

The processElement()Metode dari MonitorWorkTimefungsi register timer untuk membersihkan negara 24 jam setelah dimulainya pergeseran. Menghapus keadaan yang tidak lagi diperlukan penting untuk mencegah ukuran negara berkembang karena keadaan bocor. Pengatur waktu akan aktif ketika waktu aplikasi melewati stempel waktu pengatur waktu. Pada titik itu, onTimer()metode tersebut dipanggil. Mirip dengan status, pengatur waktu dipertahankan per kunci, dan fungsi dimasukkan ke dalam konteks kunci terkait sebelum onTimer()metode dipanggil. Karenanya, semua akses negara diarahkan ke kunci yang aktif saat timer didaftarkan.

Mari kita lihat onTimer()metode dari MonitorWorkTime.

@Mengesampingkan

public void onTimer (

    timer lama,

    OnTimerContext ctx,

    Pengumpul out) melempar Exception {

  // hapus status shift jika belum ada shift baru yang dimulai.

  StartTs panjang = shiftStart.value ();

  if (startTs == timerTs - CLEAN_UP_INTERVAL) {

    shiftStart.clear ();

  }

}

The processElement()Metode register timer selama 24 jam setelah pergeseran mulai membersihkan negara yang tidak lagi diperlukan. Membersihkan status adalah satu-satunya logika yang onTimer()diterapkan metode ini. Ketika pengatur waktu menyala, kami memeriksa apakah pengemudi memulai shift baru untuk sementara waktu, yaitu apakah waktu mulai shift berubah. Jika bukan itu masalahnya, kami menghapus status shift untuk pengemudi.