Java - Deteksi dan Penanganan Gantung Benang

Oleh Alex. C. Punnen

Arsitek - Nokia Siemens Networks

Bangalore

Benang gantung adalah tantangan umum dalam pengembangan perangkat lunak yang harus berinteraksi dengan perangkat berpemilik menggunakan antarmuka berpemilik atau standar seperti SNMP, Q3, atau Telnet. Masalah ini tidak terbatas pada manajemen jaringan tetapi terjadi di berbagai bidang seperti server web, proses yang meminta panggilan prosedur jarak jauh, dan sebagainya.

Sebuah utas yang memulai permintaan ke perangkat memerlukan mekanisme untuk mendeteksi jika perangkat tidak merespons atau hanya merespons sebagian. Dalam beberapa kasus di mana hang seperti itu terdeteksi, tindakan khusus harus diambil. Tindakan tertentu dapat berupa uji coba ulang atau memberi tahu pengguna akhir tentang kegagalan tugas atau beberapa opsi pemulihan lainnya. Dalam beberapa kasus di mana sejumlah besar tugas harus dipecat ke sejumlah besar elemen jaringan oleh sebuah komponen, deteksi thread gantung penting agar tidak menjadi hambatan untuk pemrosesan tugas lainnya. Jadi ada dua aspek untuk mengelola utas gantung: kinerja dan pemberitahuan .

Untuk aspek notifikasi, kami dapat menyesuaikan pola Java Observer agar sesuai dengan dunia multithread.

Menyesuaikan Pola Pengamat Java ke Sistem Multithread

Karena tugas yang menggantung, menggunakan ThreadPoolkelas Java dengan strategi yang pas adalah solusi pertama yang terlintas dalam pikiran. Namun menggunakan Java ThreadPooldalam konteks beberapa utas yang menggantung secara acak selama periode waktu tertentu memberikan perilaku yang tidak diinginkan berdasarkan strategi tertentu yang digunakan, seperti kelaparan utas dalam kasus strategi kumpulan utas tetap. Hal ini terutama disebabkan oleh fakta bahwa Java ThreadPooltidak memiliki mekanisme untuk mendeteksi thread yang hang.

Kami dapat mencoba kumpulan utas dalam cache, tetapi juga memiliki masalah. Jika ada tingkat pengaktifan tugas yang tinggi, dan beberapa utas macet, jumlah utas dapat meningkat, yang pada akhirnya menyebabkan kekurangan sumber daya dan pengecualian kehabisan memori. Atau kita bisa menggunakan ThreadPoolstrategi Kustom yang meminta CallerRunsPolicy. Dalam kasus ini, juga, utas yang menggantung dapat menyebabkan semua utas pada akhirnya macet. (Utas utama tidak boleh menjadi pemanggil, karena ada kemungkinan bahwa tugas apa pun yang diteruskan ke utas utama dapat macet, menyebabkan semuanya terhenti.)

Jadi apa solusinya? Saya akan mendemonstrasikan pola ThreadPool yang tidak terlalu sederhana yang menyesuaikan ukuran kolam sesuai dengan tingkat tugas dan berdasarkan jumlah utas yang menggantung. Mari kita pergi ke masalah mendeteksi benang gantung.

Mendeteksi Benang Gantung

Gambar 1 menunjukkan abstraksi dari pola:

Ada dua kelas penting di sini: ThreadManagerdan ManagedThread. Keduanya diturunkan dari Threadkelas Java . The ThreadManagermemegang wadah yang memegang ManagedThreads. Saat baru ManagedThreaddibuat, ia menambahkan dirinya sendiri ke penampung ini.

 ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false); testthread.start(); thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start(); 

The ThreadManagerberiterasi daftar ini dan menyebut ManagedThread's isHung()metode. Ini pada dasarnya adalah logika pemeriksaan stempel waktu.

 if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime ) { logger.debug("Thread is hung"); return true; } 

Jika ditemukan bahwa utas telah masuk ke loop tugas dan tidak pernah memperbarui hasilnya, diperlukan mekanisme pemulihan seperti yang ditetapkan oleh ManageThread.

 while(isRunning) { for (Iterator iterator = managedThreads.iterator(); iterator.hasNext();) { ManagedThreadData thrddata = (ManagedThreadData) iterator.next(); if(thrddata.getManagedThread().isHung()) { logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() ); switch (thrddata.getManagedAction()) { case RESTART_THREAD: // The action here is to restart the the thread //remove from the manager iterator.remove(); //stop the processing of this thread if possible thrddata.getManagedThread().stopProcessing(); if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create { ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread newThread.start(); //add it back to be managed manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime()); } break; ......... 

Untuk yang baru yang ManagedThreadakan dibuat dan digunakan sebagai pengganti yang digantung itu tidak boleh menahan negara bagian atau wadah apa pun. Untuk ini wadah tempat ManagedThreadtindakan harus dipisahkan. Di sini kita menggunakan pola Singleton berbasis ENUM untuk menyimpan daftar Tugas. Jadi wadah yang menampung tugas tidak bergantung pada thread yang memproses tugas. Klik tautan berikut untuk mengunduh sumber pola yang dijelaskan: Sumber Java Thread Manager.

Menggantung Benang dan Strategi Java ThreadPool

Java ThreadPooltidak memiliki mekanisme untuk mendeteksi utas yang menggantung. Menggunakan strategi seperti threadpool tetap ( Executors.newFixedThreadPool()) tidak akan berfungsi karena jika beberapa tugas hang seiring waktu, semua utas pada akhirnya akan berada dalam status hang. Opsi lainnya adalah menggunakan kebijakan ThreadPool yang di-cache (Executors.newCachedThreadPool()). Ini dapat memastikan bahwa akan selalu ada utas yang tersedia untuk memproses tugas, hanya dibatasi oleh memori VM, CPU, dan batas utas. Namun, dengan kebijakan ini tidak ada kontrol atas jumlah utas yang dibuat. Terlepas dari apakah thread pemrosesan macet atau tidak, menggunakan kebijakan ini saat tingkat tugas tinggi menyebabkan sejumlah besar thread dibuat. Jika Anda tidak memiliki cukup sumber daya untuk JVM segera Anda akan mencapai ambang memori maksimum atau CPU tinggi. Sangat umum untuk melihat jumlah utas mencapai ratusan atau ribuan. Meskipun mereka dilepaskan setelah tugas diproses, terkadang selama penanganan burst, jumlah thread yang tinggi akan membanjiri sumber daya sistem.

Opsi ketiga adalah menggunakan strategi atau kebijakan kustom. Salah satu opsi tersebut adalah memiliki kumpulan utas yang berskala dari 0 hingga beberapa angka maksimum. Jadi bahkan jika satu utas digantung, utas baru akan dibuat selama jumlah utas maksimum tercapai:

 execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue()); 

Di sini 3 adalah jumlah utas maksimum dan waktu keep-hidup diatur ke 60 detik karena ini adalah proses intensif tugas. Jika kami memberikan jumlah utas maksimum yang cukup tinggi, ini kurang lebih merupakan kebijakan yang masuk akal untuk digunakan dalam konteks tugas gantung. Satu-satunya masalah adalah bahwa jika utas gantung tidak dilepaskan pada akhirnya ada sedikit kemungkinan bahwa semua utas pada suatu saat bisa menggantung. Jika utas maksimum cukup tinggi dan mengasumsikan bahwa tugas hang adalah fenomena yang jarang, maka kebijakan ini akan sesuai dengan tagihan.

Akan lebih baik jika ThreadPoolmereka juga memiliki mekanisme yang dapat dipasang untuk mendeteksi benang gantung. Saya akan membahas satu desain seperti itu nanti. Tentu saja jika semua utas dibekukan, Anda dapat mengonfigurasi dan menggunakan kebijakan tugas yang ditolak dari kumpulan utas. Jika Anda tidak ingin membuang tugas, Anda harus menggunakan CallerRunsPolicy:

 execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue() new ThreadPoolExecutor.CallerRunsPolicy()); 

Dalam kasus ini, jika utas hang menyebabkan tugas ditolak, tugas itu akan diberikan ke utas pemanggil untuk ditangani. Selalu ada kemungkinan tugas itu terlalu menggantung. Dalam hal ini seluruh proses akan macet. Jadi lebih baik tidak menambahkan kebijakan seperti itu dalam konteks ini.

 public class NotificationProcessor implements Runnable { private final NotificationOriginator notificationOrginator; boolean isRunning = true; private final ExecutorService execexec; AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor // execexec = Executors.newCachedThreadPool();// Too many threads // execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); } public void run() { while (isRunning) { try { final Task task = TaskQueue.INSTANCE.getTask(); Runnable thisTrap= new Runnable() { public void run() { ++alarmid; notificaionOrginator.notify(new OctetString(), // Task processing nbialarmnew.getOID(), nbialarmnew.createVariableBindingPayload()); É........}} ; execexec.execute(thisTrap); } 

Sebuah ThreadPool Kustom dengan Deteksi Hang

Pustaka kumpulan utas dengan kemampuan deteksi dan penanganan hang tugas akan sangat bagus untuk dimiliki. Saya telah mengembangkan satu dan saya akan mendemonstrasikannya di bawah. Ini sebenarnya adalah port dari kumpulan benang C ++ yang saya rancang dan gunakan beberapa waktu lalu (lihat referensi). Pada dasarnya solusi ini menggunakan pola Command dan pola Chain of Responsibility. Namun untuk mengimplementasikan pola Command di Java tanpa bantuan dukungan objek Function agak sulit. Untuk ini saya harus sedikit mengubah implementasinya untuk menggunakan refleksi Java. Perhatikan bahwa konteks di mana pola ini dirancang adalah di mana kumpulan utas harus dipasang / dicolokkan tanpa memodifikasi kelas yang ada.(Saya percaya satu manfaat besar dari pemrograman berorientasi objek adalah ia memberi kita cara untuk merancang kelas sehingga dapat menggunakan Prinsip Tertutup Terbuka secara efektif. Hal ini terutama berlaku untuk kode lama yang kompleks dan mungkin kurang relevan untuk pengembangan produk baru.) Oleh karena itu saya menggunakan refleksi daripada menggunakan antarmuka untuk mengimplementasikan pola Perintah. Sisa kode dapat di-porting tanpa perubahan besar karena hampir semua sinkronisasi thread dan primitif pensinyalan tersedia di Java 1.5 dan seterusnya.Sisa kode dapat di-porting tanpa perubahan besar karena hampir semua sinkronisasi thread dan primitif pensinyalan tersedia di Java 1.5 dan seterusnya.Sisa kode dapat di-porting tanpa perubahan besar karena hampir semua sinkronisasi thread dan primitif pensinyalan tersedia di Java 1.5 dan seterusnya.

 public class Command { private Object[ ]argParameter; ........ //Ctor for a method with two args Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) { m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = key; argParameter = new Object[2]; argParameter[0] = arg1; argParameter[1] = arg2; } // Calls the method of the object void execute() { Class klass = m_objptr.getClass(); Class[] paramTypes = new Class[]{int.class, int.class}; try { Method methodName = klass.getMethod(m_methodName, paramTypes); //System.out.println("Found the method--> " + methodName); if (argParameter.length == 2) { methodName.invoke(m_objptr, (Object) argParameter[0], (Object) argParameter[1]); } 

Contoh penggunaan pola ini:

 public class CTask {.. public int DoSomething(int a, int b) {...} } 

Command cmd4 = new Command(task4, "DoMultiplication", 1, "key2",2,5);

Sekarang kami memiliki dua kelas penting lagi di sini. Salah satunya adalah ThreadChainkelas, yang menerapkan pola Rantai Tanggung Jawab:

 public class ThreadChain implements Runnable { public ThreadChain(ThreadChain p, ThreadPool pool, String name) { AddRef(); deleteMe = false; busy = false; //--> very important next = p; //set the thread chain - note this is like a linked list impl threadpool = pool; //set the thread pool - Root of the threadpool ........ threadId = ++ThreadId; ...... // start the thread thisThread = new Thread(this, name + inttid.toString()); thisThread.start(); } 

Kelas ini memiliki dua metode utama. Salah satunya adalah Boolean CanHandle()yang diprakarsai oleh ThreadPoolkelas dan kemudian dilanjutkan secara rekursif. Ini memeriksa apakah utas saat ini ( ThreadChaincontoh saat ini ) bebas untuk menangani tugas. Jika sudah menangani tugas, ia akan memanggil tugas berikutnya dalam rantai.

 public Boolean canHandle() { if (!busy) { //If not busy System.out.println("Can Handle This Event in id=" + threadId); // todo signal an event try { condLock.lock(); condWait.signal(); //Signal the HandleRequest which is waiting for this in the run method ......................................... return true; } ......................................... ///Else see if the next object in the chain is free /// to handle the request return next.canHandle(); 

Perhatikan bahwa HandleRequestadalah metode ThreadChainyang dipanggil dari Thread run()metode dan menunggu sinyal dari canHandlemetode tersebut. Perhatikan juga bagaimana tugas ditangani melalui pola Command.