Sistem Süreçleri, Ağlar and Paylaşım

Değişemez olanı Değiştirmek

Eğer biraz dik kafalıysanız (anladığım kadarıyla) ve ödünç alma kurallarını nasıl es geçebilmenin bir yolu olup olmadığını kara kara düşünüyor olabilirsiniz.

Bu minik programı bir inceleyin, derlenecek ve hiçbir hata vermeyecektir.

// cell.rs
use std::cell::Cell;

fn main() {
    let answer = Cell::new(42);

    assert_eq!(answer.get(), 42);

    answer.set(77);

    assert_eq!(answer.get(), 77);
}

Evet, answer değişkeni değişebilir olarak belirtilmemesine rağmen içeriği değişti!

Bu gayet emniyetli, çünkü içeriğindeki Cell içindeki veri yalnızca set veya get ile erişilebilir. Bunun adı iç değişebilirliktir (interior mutability): Eğer bir v isminde bir yapı (struct) tanımladıysam v isimli yapı değiştirilebilirse v.a da değiştirilebilir olur. Cell, biraz bu kuralı rahatlatıyor çünkü set aracılığıyla içeriğindeki değeri değiştirebiliyoruz.

Fakat, Cell sadece Copy özelliğine sahip tiplerle çalışır. (Mesela kullanıcının tanımadığı Copy özelliğine sahip tiplerle veyahut ilkel tiplerle)

Farklı türden verilerle çalışmak için referansa ihtiyaç duyarız, değişebilir ya da değişemez olması fark etmeksizin. İşte bize RefCell bize bu konuda yarımcı olur - içerideki veriye açıkça bir şekilde referanslarla erişirsiniz.

// refcell.rs
use std::cell::RefCell;

fn main() {
    let greeting = RefCell::new("hello".to_string());

    assert_eq!(*greeting.borrow(), "hello");
    assert_eq!(greeting.borrow().len(), 5);

    *greeting.borrow_mut() = "hola".to_string();

    assert_eq!(*greeting.borrow(), "hola");
}

Dikkat edin, greeting değişebilir bir değişken olarak bildirilmedi!

Rust'ın dereferans operatörü bir tık kafa karıştırıcı olabilir, çünkü çoğu zaman buna ihtiyaç duymazsınız - mesela greeting.borrow().len() diye çağırsaydık metot kendiliğinden dereferans edeceği için sorun olmazdı. Ancak içeriden gelen greeting.borrow() üzerinden gelen &String veya greeting.borrow_mut() üzerinden gelen &mut String ile çalışmaya devam edebilmek adına * eklemeniz gerekecektir.

RefCell kullanmak her zaman emniyetli değildir, hâlen daha geri dönen metotların temel kurallara riayet etmesi gerekmektedir.


#![allow(unused)]
fn main() {
    let mut gr = greeting.borrow_mut(); // gr is a mutable borrow
    *gr = "hola".to_string();

    assert_eq!(*greeting.borrow(), "hola"); // <== we blow up here!
....
thread 'main' panicked at 'already mutably borrowed: BorrowError'
}

Eğer değişebilir bir referansınız hâlihazırda varsa tekrardan değişebilir referans oluşturamazsınız. Fakat bu sefer bu kural ihlali derleme zamanında değil çalışma zamanında anlaşılabilir. Çözüm, her zaman olduğu gibi, değişebilir referansları mümkün olduğunca az sayıda tutmaktır - örneğimizdeki kod için değişebilir referansımız gr'ı kod blokları içerisinde tutmayı düşünebilirsiniz böylece referansımız tekrar ödünç almadan önce düşürülmüş olur.

Bu özellik iyi bir sebebiniz olmadan kullanmak isteyeceğiniz bir şey değil çünkü hatalarınızı derleme zamanında almayacaksınız. Bu tipler, olağan kuralların sizin işinizi engellediği ama doğrusunu ypatığınız durumlarda size dinamik ödünç alma sunmak için vardır.

Paylaşılan Referanslar

Şimdiye dek, değer ve ödünç alınmış referanslar derleme zamanında açıkça biliniyordu. Veri sahiptir, referanslar ise onsuz var olamaz. Fakat her şey bu düzgün desen tasarımına uygun değildir, mesela düşünelim ki Rol ve Oyuncu diye iki yapımız (struct) var. Oyuncu, Rol objelerine referanslar bulunan bir vektör tutuyor. Burada veriler arasında net bir şekilde birebir eşleşme yok ve rustc'yi buna ikna etmek işleri epeyce karmaşıklaştıracaktır.

Rc tıpkı Box gibi çalışır - heap belleği tahsis edilir ve veri bunun içine taşınır. Eğer bir Box klonlarsanız, içindeki klonlanmış veriyi de beraberinde tutan bir bellek alanı tahsis eder. Fakat bir Rc klonlamak bilgisayar için daha kolaydır, çünkü her klonlama yapacağınız zaman sadece verinin referans sayısını arttırır. Bu, bellek yönetimi için eski ve bilindik bir yöntemdir; mesela iOS ve MacOSlarda kullanılan Objective C'nin çalışma zamanında kullanılır. (Ç.N: Eğer ilgiliyseniz "Automatic Referance Counting" diye bir araştırma yapabilirsiniz.) Modern C++'da bu özellike str::shared_ptr aracılığıyla bulunabilir.

Bir Rcnin içinde bulunduğu kapsam sona erdiği zaman referans sayımı da bir azaltılır. Eğer bu sayı sıfır olursa bellek boşaltılır ve sahiplenilmiş olan veri de düşürülür.

// rc1.rs
use std::rc::Rc;

fn main() {
    let s = "hello dolly".to_string();
    let rs1 = Rc::new(s); // s moves to heap; ref count 1
    let rs2 = rs1.clone(); // ref count 2

    println!("len {}, {}", rs1.len(), rs2.len());
} // both rs1 and rs2 drop, string dies.

Orijinal veriye istediğiniz kadar çok referans alabilirsiniz - bu daha önce bahsettiğimiz dinamik ödünç almadır. T verisi ve onun referansları olan &T'yi dikkatlice takip etmek zorunda değilsiniz. Bunun karşılığında makine çalışma zamanında biraz daha yorulmuş olur, bu yüzden ilk seçiminiz bu olmamalıdır; fakat bu şekilde ödünç alma mekanizmasının sert kurallarına ters düşebilecek paylaşım tasarıları kurgulayabilirsiniz. Dikkat edin ki Rc size değiştirilemez referanslar sağlayacaktır, aksi taktirde en basit ödünç alma kuralını ihlal ediyor olurduk. Bilirsiniz, huylu huyundan vazgeçmez.

Oyuncu örneğinde, rollerimizi Vec<Rc<Rol>> olarak tutabiliriz ve böylece rol ekleyip çıkartabiliriz ancak oluşturulduktan sonra rolleri değiştiremeyiz.

Fakat, ya Oyuncu başka bir takıma referanslar bulunduruyorsa ve takım oyuncu referanslarını tek bir vektör içinde tutuyorsa? Her şey değiştirilemez olur çünkü Oyuncu verilerinin Rc olarak depolanması gerekir! Bu durumda RefCell gerekli olur. Bu sefer bütün takım Vec<Rc<RefCell<Oyuncu>>> içinde tutulabilir. Oyuncuyu borrow_mut, aynı anda birden çok değişebilir referans tutmayacağımızdan emin olarak değiştirebiliriz. Mesela oyuncuya özel bir şeyler olursa bütün takımın güçleneceğine dair güçlü bir kuralımız var:


#![allow(unused)]
fn main() {
    for p in &self.team {
        p.borrow_mut().make_stronger();
    }
}

Kod fena değil, ancak tipler biraz ürkünç görünebilir. Her zaman type ile onlara daha basit isimler atayabiliriz:


#![allow(unused)]
fn main() {
type PlayerRef = Rc<RefCell<Player>>;
}

Çoklu Sistem Süreçleri

Yaklaşık bir yirmi yıldır, saf işlem hızından çoklu çekirdeklere bir geçiş var. Son teknoloji bilgisayarlarımızdan tamamen verim almak için bütün çekirdekleri kullanmalıyız. Bunun en iyi yolu arkaplanda alt süreçler oluşturmaktır, tıpkı daha önceden gördüğümüz Command ancak bu sefer bir senkranizasyon sorunumuz var; bu alt süreçleri beklemeden onların tamamlanıp tamamlanmadığından emin değiliz.

Ayrı iş süreçlerine ihtiyaç duymamızın tek sebebi bu değil elbette, bütün programı sırf bir girdi almak için bekletemezsiniz, mesela.

Alt süreçler üretmek oldukça basit, sadece arkaplanda işletilecek spawn için bir kapama hazırlayın.

// thread1.rs
use std::thread;
use std::time;

fn main() {
    thread::spawn(|| println!("hello"));
    thread::spawn(|| println!("dolly"));

    println!("so fine");
    // wait a little bit
    thread::sleep(time::Duration::from_millis(100));
}
// so fine
// hello
// dolly

Kod satırında gördüğünüz "wait a little bit", "az bekle" demektir ve pek de mantıklı bir çözüme benzemiyor. Bize dönen nesnelerin üzerinde join çağırmak biraz daha mantıklı, ana süreç bu süreçlerin tamamlanmasını bekleyecektir.

// thread2.rs
use std::thread;

fn main() {
    let t = thread::spawn(|| {
        println!("hello");
    });
    println!("wait {:?}", t.join());
}
// hello
// wait Ok(())

İşte başka bir tür acayiplik: Alt süreci paniklemeye zorlayalım:


#![allow(unused)]
fn main() {
    let t = thread::spawn(|| {
        println!("hello");
        panic!("I give up!");
    });
    println!("wait {:?}", t.join());
}

Beklediğimiz gibi panikledi, ancak sadece panikleyen süreç öldü! Ekrana hata mesajını join ile yazabiliyoruz, yani evet her paniğin sonu programın kapatılması değildir; ancak süreçler bilgisayar için yorucu bir işlemdir ve bu süreçleri kontrol etmenin bir yolu olarak görülmelidir.

hello
thread '<unnamed>' panicked at 'I give up!', thread2.rs:7
note: Run with `RUST_BACKTRACE=1` for a backtrace.
wait Err(Any)

Dönen objeler çeşitli alt süreçlerini takip için etmek için kullanılabilir.

// thread4.rs
use std::thread;

fn main() {
    let mut threads = Vec::new();

    for i in 0..5 {
        let t = thread::spawn(move || {
            println!("hello {}", i);
        });
        threads.push(t);
    }

    for t in threads {
        t.join().expect("thread failed");
    }
}
// hello 0
// hello 2
// hello 4
// hello 3
// hello 1

Rust joinden dönen sonucu ele almamız için bize ısrar eder, mesela alt süreç panikleyebilir. (Bu gerçekleştiği zaman genelde programın tamamını durdurmazsınız, sadece hataları not edersiniz, yeniden denersiniz vs.)

Alt süreçlerin işleyişi için belirli bir sıra yoktur (Program her çalışmada farklı bir sıra verir), ve buna esas noktadır - onlar gerçekten bağımsız yürütme süreçleridir (independent threads of execution. Çoklu süreçler kolaydır, esas olay eşzamanlılıktır (concurrency) - birden çok sürecin süreci senkronize etmek ve yönetmek.

Süreçler Ödünç Almaz

Süreç kapamaları dışarıdan veri alabilir, ancak taşıyarak, ödünç alarak değil!

// thread3.rs
use std::thread;

fn main() {
    let name = "dolly".to_string();
    let t = thread::spawn(|| {
        println!("hello {}", name);
    });
    println!("wait {:?}", t.join());
}

Ve işte karşınızda size yardımcı olan hata mesajı:

error[E0373]: closure may outlive the current function, but it borrows `name`, which is owned by the current function
 --> thread3.rs:6:27
  |
6 |     let t = thread::spawn(|| {
  |                           ^^ may outlive borrowed value `name`
7 |         println!("hello {}", name);
  |                             ---- `name` is borrowed here
  |
help: to force the closure to take ownership of `name` (and any other referenced variables), use the `move` keyword, as shown:
  |     let t = thread::spawn(move || {

Anlaşılabilir! Bir fonksiyon içerisinde üretilen süreci düşünün - fonksiyon çağrısı sona erdikten sonra bile çalışmaya devam eder ve name dürüşürülebilir. Kapamamıza move eklemek bu sorunu çözecektir.

Fakat bu bir taşımadır, yani name sadece bir süreçte var olabilr! Referansları paylaşmanın mümkün olduğunu vurgulamak istiyorum, ancak static ömre sahip olmalıdırlar:


#![allow(unused)]
fn main() {
let name = "dolly";
let t1 = thread::spawn(move || {
    println!("hello {}", name);
});
let t2 = thread::spawn(move || {
    println!("goodbye {}", name);
});
}

name, bütün programın çalışma süresince var olacaktır (static), bu yüzden rustc kapama çalıştığı sürece name değerinin de varlığından emin olacaktır. Ancak, havalı referansların static ömürleri yoktur!

Alt süreçler ortak bir ortamı paylaşamazlar - bu Rust'ın kendi tarzıdır. Biraz detay girersek, olağan referansları paylaşamazlar çünkü kapamalar yakaladıkları verileri taşırlar.

Yine de Paylaşılan referanslar fena değil çünkü yaşam ömürleri "gerektiği kadardır" - ama bunun için Rc kullanamazsınız. Çünkü Rc alt süreçler arası emniyete sahip değildir (thread safe) - sadece süreçlerin olmadığı anlar için hızlı olmaya optimize edilmiştir. Neyse ki Rc kullanırsanız derleme zamanında bir hata alırsınız, derleyici sizin arkanızı kollar.

Alt süreçler için std::sync::Arc kullanmalısınız - "Arc"ın açılımı "Atomic Referance Counting" yani "Atomik Referans Sayımıdır". Hepsi bu, bu dost her şeyin tek bir işlemde değiştirileceğini garanti eder. Bu garantiyi sağlamak için de işlem gerçekleştirilirken kilitlenir ve o an sadece bir sürecin erişimine izin verilir. clone kullanmak, bir kopya üretmekten bilgisayar için daha zahmetsizdir. (Ç.N: Buradaki clone, baştan aşağıya bellekte veri kopyalayan std::clone değil, referans klonlayan std::sync::Arc::clone metotudur.)

// thread5.rs
use std::thread;
use std::sync::Arc;

struct MyString(String);

impl MyString {
    fn new(s: &str) -> MyString {
        MyString(s.to_string())
    }
}

fn main() {
    let mut threads = Vec::new();
    let name = Arc::new(MyString::new("dolly"));

    for i in 0..5 {
        let tname = name.clone();
        let t = thread::spawn(move || {
            println!("hello {} count {}", tname.0, i);
        });
        threads.push(t);
    }

    for t in threads {
        t.join().expect("thread failed");
    }
}

Bilinçli olarak String barındıran bir tip oluşturdum, ("newtype" ya da yenitür) çünkü MyString, Clone özelliğini taşımayacak. Fakat paylaşılan referanslar klonlanabilir!

name'e ait paylaşılan referanslar her yeni alt sürece clone ile oluşturulan yeni referanslar olarak iletilir ve kapama içerisine taşınır. Biraz fazla kod yazdırıyor, ancak bu emniyetli bir örüntüdür. Emniyet, eşzamanlılık için epey önemlidir çünkü sorunlar pek öngörülebilir değildir. Program sizin bilgisayarınızda düzgünce çalışsa bile bir sunucuda patlayabilir, mesela haftasonu keyif yaparken. Daha da kötüsü, problemi semptomları bir tanı koymanıza yardımcı olmayabilir.

Kanallar

Süreçler arasında veri paylaşmanın çeşitli yolları var. Rust içerisinde, bunlardan birisi kanalları kullanmaktır. std::sync::mpsc::channel(), alıcı kanal ve verici kanal olmak üzere bize iki veri tutan bir demet sunar. Her bir alt sürece verici clone ile yollanır, ve referans üzerinde send çağrılır. Ana süreç ise bu esnada alıcı üzerinde recvi çağırır.

MPSC'nin açılımı "Multiple Producer Single Consumer"dır, yani "Çoklu üretici, tek tüketici". Kanala veri yollamaya teşebbüs eden birden çok altsüreç oluşturacağız, ana sürecimiz de kanalı "tüketecek".

// thread9.rs
use std::thread;
use std::sync::mpsc;

fn main() {
    let nthreads = 5;
    let (tx, rx) = mpsc::channel();

    for i in 0..nthreads {
        let tx = tx.clone();
        thread::spawn(move || {
            let response = format!("hello {}", i);
            tx.send(response).unwrap();
        });
    }

    for _ in 0..nthreads {
        println!("got {:?}", rx.recv());
    }
}
// got Ok("hello 0")
// got Ok("hello 1")
// got Ok("hello 3")
// got Ok("hello 4")
// got Ok("hello 2")

Bu sefer join kullanmaya ihtiyacımız yok çünkü alt süreçler kendilerini sonlandırmasından sonra cevaplarını dönecektir, fakat bu her an gerçekleşebilir. recv süreci kilitleyecektir ve eğer yollayıcı kanal devredışı kalacaksa bir hata dönecektir. recv_timeout ise belli bir süre boyunca bloklayacaktır ve ek olarak bir de zamanaşımı hatası dönebilecektir.

send ise süreci kilitlemeyecektir, bu faydalıdır çünkü süreçler mesajın alınmasını beklemeye gerek duymaksızın bir veriyi kanala atıp devam edecektir. Ek olarak, kanalın bir belleği vardır, böylece birden çok send metodu çalışabilir ve alıcıya sırasıyla mesaj iletilecektir.

Fakat, sürecin engellenmemesi aynı zamanda Ok değerinin mesajın başarıyla iletildiği anlamına gelmediğini de işaret eder.

sync_channel ise kanala veri yollarken süreci kilitler. Argüman sıfır olursa, alıcı mesajı recv ile alana kadar süreç kilitlenir. Süreçler ya buluşmalıdır ya da randevulaşmalıdır. (Her zaman yabancı kökenli kelimeler kulağa bir tık daha teknik ve doğru gelir.)


#![allow(unused)]
fn main() {
    let (tx, rx) = mpsc::sync_channel(0);

    let t1 = thread::spawn(move || {
        for i in 0..5 {
            tx.send(i).unwrap();
        }
    });

    for _ in 0..5 {
        let res = rx.recv().unwrap();
        println!("{}",res);
    }
    t1.join().unwrap();
}

Burada send kullanılmamışken recv kullanarak bir hataya kolayca sebep olabilir, mesela döngüyü for i in 0..5 ile kurmak yerine for i in 0..4 kullanarak. Süreç sona erer, tx düşer ve recv başarısız olur. Bu aynı zamanda bir süreç paniklediği zaman da gerçekleşir, stack yavaşça çözülür ve bütün veriler düşürülür.

Eğer sync_channel sıfır olmayan bir argümanla oluşturulursa, buna n diyelim, bu sefer en fazla n değeri alan bir sıra (queue) gibi davranır, send sadece sırada bekleyen nden fazla değer varsa süreci kilitleyecektir.

Kanallar güçlü tip (strongly type) mantığına uygundur, bu örnekte kanalın tipi i32dir, ancak tip çıkarımı bunu biraz gizler. Eğer farklı türden verilere ihtiyacınız varsa, numaralandırmalar (enum) bunu ifade etmek için uygundur.

Senkronizasyon

Senkranizasyona bakalım. join oldukça basit, tek işi bir iş parçacığı bitene kadar beklemek. sync_channel ise iki kanalı birbirine senkronize ediyor - son örneğimizde üretilen alt süreç ve ana süreç tamamen birbirine kilitlenmişti.

Bariyer senkronizasyonu, bütün süreçlerin bir noktaya geldiği zaman diğer süreçlerin beklemesini içerir, sonra yollarına devam ederler. Bariyer, beklemesini istediğimiz süreçlerin toplam sayısıyla oluşturulur. Daha önce olduğu gibi Arc aracılığıyla bu bariyeri diğer altsüreçlerle paylaşabilirsiniz.

// thread7.rs
use std::thread;
use std::sync::Arc;
use std::sync::Barrier;

fn main() {
    let nthreads = 5;
    let mut threads = Vec::new();
    let barrier = Arc::new(Barrier::new(nthreads));

    for i in 0..nthreads {
        let barrier = barrier.clone();
        let t = thread::spawn(move || {
            println!("before wait {}", i);
            barrier.wait();
            println!("after wait {}", i);
        });
        threads.push(t);
    }

    for t in threads {
        t.join().unwrap();
    }
}
// before wait 2
// before wait 0
// before wait 1
// before wait 3
// before wait 4
// after wait 4
// after wait 2
// after wait 3
// after wait 0
// after wait 1

Süreçler yine yarı-rastgele çalışırken bir anda birleşiyorlar ve sonra tekrar devam ediyorlar. Bu, devam ettirilebilir bir join gibidir ve bütün süreçlerin belli bir işi yaptıktan sonra o işle devam etmesini istediğinizde kullanışlı olabilir.

Paylaşılmış Durumlar

Süreçler, kendi paylaşılmış durum bilgisini nasıl düzenler?

Aklınıza dinamik olarak paylaşılan değişebilir referans almak için kullandığımız Rc<RefCell<T>> stratejisini getirin. RefCellin süreçlerde kullanılan muadili ise Mutex - değişken referansı lock kullanarak alabilirsiniz. Referans var olduğu müddetçe diğer süreçler veriye erişemeyecektir. mutex'in açılımı "Mutual Exclusion" yani "Karşılıklı Hariciyet" - bir sürecin erişmesi için kodun ilgili kısmını kilitliyoruz ve ardından kilidini açıyoruz. lock ile kilitlersiniz ve referans düşünce de kilit kalkar.

// thread9.rs
use std::thread;
use std::sync::Arc;
use std::sync::Mutex;

fn main() {
    let answer = Arc::new(Mutex::new(42));

    let answer_ref = answer.clone();
    let t = thread::spawn(move || {
        let mut answer = answer_ref.lock().unwrap();
        *answer = 55;
    });

    t.join().unwrap();

    let ar = answer.lock().unwrap();
    assert_eq!(*ar, 55);

}

RefCell kadar kolay değil çünkü eğer kilidin olduğu bir süreç paniklerse mutex daima kilitli kalabilir. (Böyle bir durumda, dokümentasyon açıkça süreci unwrap ile terk etmeniz gerektiğini söyler çünkü bir şeyler çok yanlış gitmiştir.)

Bu sefer değişebilir referansları mümkün olduğunca az tutmak çok daha önemli; çünkü bu mutex kapalı kaldıkça diğer süreçler de bloklanacaktır. Bu, kilitleyip bilgisayar için zor hesaplamaların yapmanın yeri bu değil! Yani, muhtemelen kodunuz şuna benzeyecek:


#![allow(unused)]
fn main() {
// ... do something in the thread
// get a locked reference and use it briefly!
{
    let mut data = data_ref.lock().unwrap();
    // modify data
}
//... continue with the thread
}

Yüksek Seviyeli İşlem

Belki de süreçleri yönetmenin daha yüksek seviyeli bir yolunu bulmak, süreçleri tek tek elle kontrol etmekten daha iyidir. Bunun bir örneği hesaplamaları paralel olarak yaptırmak ve sonuçları toplamak olabilir. Epey enteresan bir sandık olarak pipeliner1 sandığına bakabilirsiniz ki çok anlaşılır bir API'ya sahiptir. Deneysel bir "Merhaba Dünya"ye ne dersiniz? - bize çıktılar veren bir döngüleyici kurgulayalım ve n adet işlemi paralel olarak çalıştıralım: 1: Görünüşe göre pipeliner sandığı Şubat 2020'den beri güncelleme almamış

extern crate pipeliner;
use pipeliner::Pipeline;

fn main() {
    for result in (0..10).with_threads(4).map(|x| x + 1) {
        println!("result: {}", result);
    }
}
// result: 1
// result: 2
// result: 5
// result: 3
// result: 6
// result: 7
// result: 8
// result: 9
// result: 10
// result: 4

Salakça bir örnek olduğunun farkındayız, çünkü ilgili operasyon zaten bilgisayar için zahmetsiz ancak paralel işlemler gerçekleştirmenin ne derece kolay olabileceğini göstermiş oldum.

Daha kullanışlı bir şey yapalım. Ağ işlemlerini paralel olarak gerçekleştirmek faydalı olabilir, çünkü genellikle uzun zaman alırlar ve işe başlamak için hepsinin tamamlanmasını beklemeyi istemezsiniz.

Örneğimiz biraz kötü (emin olun yapmanın çok daha iyi yolları var) ancak ne işe odaklanın. 4. Bölümde tanımladığımız shell fonksiyonunu belli bir aralıktaki IP4 adreslerine ping atmak için tekrar kullanacağız:

extern crate pipeliner;
use pipeliner::Pipeline;

use std::process::Command;

fn shell(cmd: &str) -> (String,bool) {
    let cmd = format!("{} 2>&1",cmd);
    let output = Command::new("/bin/sh")
        .arg("-c")
        .arg(&cmd)
        .output()
        .expect("no shell?");
    (
        String::from_utf8_lossy(&output.stdout).trim_right().to_string(),
        output.status.success()
    )
}

fn main() {
    let addresses: Vec<_> = (1..40).map(|n| format!("ping -c1 192.168.0.{}",n)).collect();
    let n = addresses.len();

    for result in addresses.with_threads(n).map(|s| shell(&s)) {
        if result.1 {
            println!("got: {}", result.0);
        }
    }

}

Kendi ev ağımda sonuç şöyle bir şey:

got: PING 192.168.0.1 (192.168.0.1) 56(84) bytes of data.
64 bytes from 192.168.0.1: icmp_seq=1 ttl=64 time=43.2 ms

--- 192.168.0.1 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 43.284/43.284/43.284/0.000 ms
got: PING 192.168.0.18 (192.168.0.18) 56(84) bytes of data.
64 bytes from 192.168.0.18: icmp_seq=1 ttl=64 time=0.029 ms

--- 192.168.0.18 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.029/0.029/0.029/0.000 ms
got: PING 192.168.0.3 (192.168.0.3) 56(84) bytes of data.
64 bytes from 192.168.0.3: icmp_seq=1 ttl=64 time=110 ms

--- 192.168.0.3 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 110.008/110.008/110.008/0.000 ms
got: PING 192.168.0.5 (192.168.0.5) 56(84) bytes of data.
64 bytes from 192.168.0.5: icmp_seq=1 ttl=64 time=207 ms
...

Aktif adresler hızlıca bize bir saniyenin yarısı kadar bir sürede bize cevap verebiliyor, gerisi de olumsuz cevapları beklemek oluyor. Eğer paralel olarak çalıştırmasaydık, yaklaşık bir dakika boyunca sonuçları beklemek zorunda kalırdık! Ping zamanı gibi çıktıdan analiz ederek bulmayı düşünebilirsiniz, ancak bu sadece Linux üzerinde işe yarabilirdi. ping her sistemde vardır ancak çıktı her platform için değişiklik gösterebilir. Eğer Rust ile çalışacak evrensel br ağ API hizmeti tasarlamak isterseniz, ağlar kısmına geçiş yapabiliriz.

Adresleri Çözümlemenin Daha İyi Bir Yolu

Eğer sadece neyin aktif olduğu bilmek istiyorsanız ve gelişmiş ping istatistikleri ilginizi çekmiyorsa, std::net::ToSocketAddrs özelliği sizin için DNS çözümlemesi yapacaktır.

use std::net::*;

fn main() {
    for res in "google.com:80".to_socket_addrs().expect("bad") {
        println!("got {:?}", res);
    }
}
// got V4(216.58.223.14:80)
// got V6([2c0f:fb50:4002:803::200e]:80)

Bu bir döngüleyicidir çünkü genellikle bir alan adına birden çok arayüz bağlıdır, ikisi de Google'un arayüzleridir; birisi IPv4 ve diğeri IPv6 olmak üzere.

Şimdi pipeliner örneğimizi masumca bu metotu yeniden yazmak için kullanabiliriz. Çoğu ağ protokolü hem bir adres hem de bir port kullanır:

extern crate pipeliner;
use pipeliner::Pipeline;

use std::net::*;

fn main() {
    let addresses: Vec<_> = (1..40).map(|n| format!("192.168.0.{}:0",n)).collect();
    let n = addresses.len();

    for result in addresses.with_threads(n).map(|s| s.to_socket_addrs()) {
        println!("got: {:?}", result);
    }
}
// got: Ok(IntoIter([V4(192.168.0.1:0)]))
// got: Ok(IntoIter([V4(192.168.0.39:0)]))
// got: Ok(IntoIter([V4(192.168.0.2:0)]))
// got: Ok(IntoIter([V4(192.168.0.3:0)]))
// got: Ok(IntoIter([V4(192.168.0.5:0)]))
// ....

Bu, ping yollamaktan çok daha hızlıdır çünkü sadece bir IP adresinin geçerli olup olmadığını ölçüyoruz, eğer bir gerçek alan adlarının bir listesini kullansaydık DNS araştırması epey vakit alabilirdi ve bu paralleliğin nasıl önemli olabileceğini bize gösteriyor.

İlginç bir şekilde, bu "çalıştır ve unut" mantığında işliyor. Standart kütüphanede bulunan ve Debug barındıran her şey, hata ayıklamak için de müthiş keşifler sunar. Dönngüleyici Result (hâliyle Ok) dönüyor ve bu Result, IPv4 veyahut IPv6 varyantları olan bir numalandırma olan SocketAddr barındıran bir IntoIter içeriyor. Peki neden IntoIter? Çünkü bir soketin birden çok adresi olabilir, (hem IPv4 hem de IPv6 adresi olması gibi.)


#![allow(unused)]
fn main() {
    for result in addresses.with_threads(n)
        .map(|s| s.to_socket_addrs().unwrap().next().unwrap())
    {
        println!("got: {:?}", result);
    }
// got: V4(192.168.0.1:0)
// got: V4(192.168.0.39:0)
// got: V4(192.168.0.3:0)
}

Bu da çalışıyor, ilginç olarak, bizim basit örneğimiz kadar işe yarıyor. İlk unwrap Result'tan kurtuluyor ve döngüleyiciden ilk çıkan değeri dışarı çıkartıyor. Result genellikle bu durumda anlamsız adreslerde tetiklenir. (Mesela portu olmayan adres isimleri gibi.)

TCP İstemci Sunucusu

Rust, en çok kullanılan ve en yaygın ağ protokolü için gayet makul bir atayüz de sunar; TCP. TCP, hatalara karşı oldukça dayanıklıdır ve ağlarla örülü dünyamızın temel taşıdır - paketler onaylanarak gönderilir ve onaylanarak alınır. Bunun tersi olarak UDP ise paketleri hiçbir onay olmadan yollar. Bunun hakkında şöyle garabet bir espri vardır: "Sana UDP hakkında bir fıkra anlatabilirim ama muhtemelen kafan almayacak." (Ağlar hakkındaki espriler komiktir, sizin komikten ne anladığınıza göre değişir tabii.)

Fakat, hata kontrolü ağlarla uğraşırken çok önemlidir çünkü her an her şey bir anlığına oluşabilir.

TCP, bir istemci/sunucu modeliyle çalışır; sunucu adresi belli bir ağ portundan dinler ve istemci de sunucuya bağlanır. Bağlantı oluşturulduğunda ise istemci ve sunucu bir soket üzerinden haberleşebilir.

TcpStream::connect, SoccetAddr'a dönüştürülebilecek her şeyi kabul eder, kullandığımız düz karakter dizilerini de.

Rust'ta basit bir TCP istemcisi yazmak oldukça kolaydır - TcpStream yapısı hem yazılabilir hem de okunabilirdir. Her zaman olduğu gibi, özellikleri kullanmak için, Read, Write ve diğer std::io özelliklerini kapsamda görünür kılmalıyız.

// client.rs
use std::net::TcpStream;
use std::io::prelude::*;

fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:8000").expect("connection failed");

    write!(stream,"hello from the client!\n").expect("write failed");
 }

Sunucumuz pek karmaşık değil, bir dinleyici kuruyoruz ve bağlantıları bekliyoruz. Eğer bir istemci bağlanırsa, sunucu tarafında TcpStream elde ederiz. Bu örnekte sunucuya gelen her şey bir karakter dizisine yazılmış olur.

// server.rs
use std::net::TcpListener;
use std::io::prelude::*;

fn main() {

    let listener = TcpListener::bind("127.0.0.1:8000").expect("could not start server");

    // accept connections and get a TcpStream
    for connection in listener.incoming() {
        match connection {
            Ok(mut stream) => {
                let mut text = String::new();
                stream.read_to_string(&mut text).expect("read failed");
                println!("got '{}'", text);
            }
            Err(e) => { println!("connection failed {}", e); }
        }
    }
}

Port numarasını aşağı yukarı rastgele geçtim, ancak pek çok port özel anlamlar barındırır.

İki tarafında protokol üzerinde uzlaştığına dikkat edin. İstemci akışa yazı yazabileceğini düşünür ve sunucu da akıştan yazıyı okuyabileceğini umar. Eğer oyunu aynı kurallarla oynamazlarsa, bir tarafın engellendiği ve hiç gelmeyecek bir cevabı beklediği durumlar oluşur.

Hataların kontrolü önemlidir - ağ girdi çıktıları çeşitli sebeplerden aksayabilir ve hatalar dolunayda dosya sisteminin kurtadama dönüşmesi gibi acayip sebeplerlerl de geçerkleşebilir. Birileri kablolarla ip atlayabilir, öbür tarafın bilgisayarı çökebilir, falan fistan. Yazdığımız ufak sunucu çok da sağlam değil, çünkü ilk hata çökecektir.

Hataları düzgünce ele alan güçlü bir sunucu aşağıda bulunmaktadır. Akıştan bir satırı bir io::BufRead üreten bir io::BufReader aracılığıyla okur ve böylece çıktı üzerinde read_line çağırabiliriz.

// server2.rs
use std::net::{TcpListener, TcpStream};
use std::io::prelude::*;
use std::io;

fn handle_connection(stream: TcpStream) -> io::Result<()>{
    let mut rdr = io::BufReader::new(stream);
    let mut text = String::new();
    rdr.read_line(&mut text)?;
    println!("got '{}'", text.trim_right());
    Ok(())
}

fn main() {

    let listener = TcpListener::bind("127.0.0.1:8000").expect("could not start server");

    // accept connections and get a TcpStream
    for connection in listener.incoming() {
        match connection {
            Ok(stream) => {
                if let Err(e) = handle_connection(stream) {
                    println!("error {:?}", e);
                }
            }
            Err(e) => { print!("connection failed {}\n", e); }
        }
    }
}

handle_connection içindeki read_line çökebilir ancak sonuçta hata emniyetli bir şekilde kontrol edilmiş olur.

Bu tarz tek yönlü iletişimler bazen kullanışlı olabilir - bu örnekte olduğu gibi. Ağda bulunan servislerin durumlarını, merkezi bir yerde toplamış oluyoruz. Fakat kibarca bir şekilde geri dönüş yapsak iyi olur, en azından "ok" deyip geçelim.

İşte bir "eko" sunucusu. İstemci, sonunda satır sonu işareti olan bir metni sunucuya yollar ve sunucuda ek bir satır sonu ekleyerek geri yollar - akış okunabilir ve yazılabilirdir.

// client_echo.rs
use std::io::prelude::*;
use std::net::TcpStream;

fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:8000").expect("connection failed");
    let msg = "hello from the client!";

    write!(stream,"{}\n", msg).expect("write failed");

    let mut resp = String::new();
    stream.read_to_string(&mut resp).expect("read failed");
    let text = resp.trim_right();
    assert_eq!(msg,text);
}

Sunucu şimdi ilginç bir hâl aldı. Sadece handle_connectionu değiştirelim:


#![allow(unused)]
fn main() {
fn handle_connection(stream: TcpStream) -> io::Result<()>{
    let mut ostream = stream.try_clone()?;
    let mut rdr = io::BufReader::new(stream);
    let mut text = String::new();
    rdr.read_line(&mut text)?;
    ostream.write_all(text.as_bytes())?;
    Ok(())
}
}

Bu yaygın olarak kullanılan yaygın br çift taraflı soket iletişimidir; BufReader'a yollamak için bir satır istiyoruz - fakat "BufReader" akışı tüketiyor! Bu yüzden akışı klonlamalıyız ve aynı soketi işaret eden yeni bir yapı oluşturmalıyız. Böylece nihayet huzuru elde ediyoruz.