gniazda Java I / O: blokujące, nieblokujące i asynchroniczne

Photo by Evi Radauscher on Unsplash

przy opisie We/Wy, terminy nieblokujące i asynchroniczne są często używane zamiennie, ale istnieje znacząca różnica między nimi. W tym artykule opisano teoretyczne i praktyczne różnice między nieblokującymi i asynchronicznymi operacjami We/Wy gniazd w Javie.

gniazda są punktami końcowymi do dwukierunkowej komunikacji za pomocą protokołów TCP i UDP. Interfejsy API gniazd Java to adaptery do odpowiedniej funkcjonalności systemów operacyjnych. Komunikacja gniazd w systemach operacyjnych zgodnych z POSIX (Unix, Linux, Mac OS X, BSD, Solaris, AIX, itp.) jest wykonywana przez Berkeley sockets. Komunikacja gniazd w systemie Windows jest wykonywana przez Winsock, który jest również oparty na gniazdach Berkeley z dodatkowymi funkcjami zgodnymi z modelem programowania Windows.

definicje POSIX

w tym artykule wykorzystano uproszczone definicje ze specyfikacji POSIX.

zablokowany wątek-wątek, który czeka na jakiś warunek, zanim będzie mógł kontynuować wykonywanie.

Blocking-właściwość gniazda, która powoduje, że wywołania do niego oczekują na wykonanie żądanej akcji przed powrotem.

Non-blocking — właściwość gniazda, która powoduje, że wywołania do niego zwracają się niezwłocznie, gdy zostanie wykryte, że żądana akcja nie może zostać wykonana bez nieznanego opóźnienia.

Synchroniczna operacja We/Wy-operacja We/Wy, która powoduje zablokowanie żądanego wątku do czasu zakończenia tej operacji We / Wy.

asynchroniczna operacja We/Wy — operacja We/Wy, która sama w sobie nie powoduje zablokowania żądanego wątku; oznacza to, że wątek i operacja We / Wy mogą działać jednocześnie.

tak więc, zgodnie ze specyfikacją POSIX, różnica między pojęciami nieblokującymi a asynchronicznymi jest oczywista:

  • non-blocking — właściwość gniazda, która powoduje, że wywołania do niego zwracają bez opóźnień
  • asynchroniczne We/Wy — właściwość operacji We/Wy (odczytu lub zapisu), która działa jednocześnie z żądającym wątkiem

modele We/Wy

następujące modele We/Wy są najczęstsze dla operacji zgodnych z POSIX systemy:

  • blokowanie modelu We/Wy
  • nieblokujący model We/Wy
  • model multipleksowania We/Wy
  • model We/Wy napędzany sygnałem
  • asynchroniczny model We/Wy

blokowanie modelu We/Wy

w modelu blokowania we/wy aplikacja wykonuje blokujące wywołanie systemowe, dopóki dane nie zostaną odebrane do jądra i skopiowane z przestrzeni jądra do przestrzeni użytkownika.

blokowanie modelu We / Wy

 blokowanie modelu We / Wy

plusy:

  • najprostszy model we/wy do wdrożenia

wady:

  • aplikacja jest zablokowana

nieblokujący model We/Wy

w nieblokującym modelu we / wy aplikacja wykonuje wywołanie systemowe, które natychmiast zwraca jedną z dwóch odpowiedzi:

  • jeśli operacja We/Wy może zostać zakończona natychmiast, dane są zwracane
  • jeśli operacja We/Wy nie może zostać zakończona natychmiast, zwracany jest kod błędu wskazujący, że operacja We/Wy zostanie zablokowana lub urządzenie jest tymczasowo niedostępne

aby zakończyć operację we/wy, aplikacja powinna czekać na zakończenie (wykonywać powtarzalne wywołania systemowe).

nieblokujący model We/Wy

nieblokujący model We / Wy

plusy:

  • aplikacja nie jest zablokowana

wady:

  • aplikacja powinna czekać na zakończenie, co spowodowałoby wiele przełączników kontekstowych jądra użytkownika
  • ten model może wprowadzić opóźnienie we/wy, ponieważ może istnieć luka między dostępnością danych w jądrze a odczytem danych przez aplikację

model multipleksowania We/Wy

w modelu multipleksowania We/Wy (znanym również jako nieblokujący model we/wy z blokującymi powiadomieniami), aplikacja wykonuje wywołanie systemowe blocking SELECT, aby rozpocząć monitorowanie aktywności na wielu deskryptorach. Dla każdego deskryptora można zażądać powiadomienia o jego gotowości do określonych operacji We / Wy (połączenie, Odczyt lub zapis, wystąpienie błędu itp.). Gdy wywołanie systemowe select zwraca, że co najmniej jeden deskryptor jest gotowy, aplikacja wykonuje wywołanie nieblokujące i kopiuje dane z przestrzeni jądra do przestrzeni użytkownika.

model multipleksowania We / Wy

 model multipleksowania We/Wy

plusy:

  • możliwe jest wykonywanie operacji We/Wy na wielu deskryptorach w jednym wątku

Cons:

  • aplikacja jest nadal blokowana w wybranym wywołaniu systemowym
  • nie wszystkie systemy operacyjne skutecznie obsługują ten model

Model We/Wy sterowany sygnałem

w modelu We/Wy sterowanym sygnałem aplikacja wykonuje nieblokujące wywołanie i rejestruje moduł obsługi sygnału. Gdy deskryptor jest gotowy do operacji We / Wy, generowany jest sygnał dla aplikacji. Następnie obsługa sygnału kopiuje dane z przestrzeni jądra do przestrzeni użytkownika.

model We/Wy napędzany sygnałem

 model We/Wy napędzany sygnałem

plusy:

  • aplikacja nie jest zablokowana
  • sygnały mogą zapewnić dobrą wydajność

wady:

  • nie wszystkie systemy operacyjne obsługują sygnały

asynchroniczny model We/Wy

w asynchronicznym modelu We/Wy (znanym również jako model we/wy) aplikacja wykonuje nieblokujące wywołanie i uruchamia operację w tle w jądrze. Po zakończeniu operacji (dane są odbierane do jądra i kopiowane z przestrzeni jądra do przestrzeni użytkownika), generowane jest wywołanie zwrotne zakończenia w celu zakończenia operacji We/Wy.

różnica między asynchronicznym modelem We/Wy a modelem We/Wy napędzanym sygnałem polega na tym, że w przypadku We/Wy napędzanych sygnałem jądro informuje aplikację, kiedy operacja We/Wy może zostać zainicjowana, ale w przypadku asynchronicznego modelu We/Wy jądro informuje aplikację, kiedy operacja We / Wy została zakończona.

asynchroniczny model We/Wy

 asynchroniczny model We/Wy

plusy:

  • aplikacja nie jest zablokowana
  • ten model może zapewnić najlepszą wydajność

wady:

  • najbardziej skomplikowany model we/wy do wdrożenia
  • nie wszystkie systemy operacyjne skutecznie obsługują ten model

Java i / o API

Java IO API opiera się na strumieniach (InputStream, OutputStream), które reprezentują blokujący, jednokierunkowy przepływ danych.

Java NIO API

Java NIO API opiera się na klasach kanałów, buforów, selektorów, które są adapterami do operacji We/Wy niskiego poziomu systemów operacyjnych.

Klasa Channel reprezentuje połączenie z jednostką (urządzeniem sprzętowym, plikiem, gniazdem, komponentem oprogramowania itp.), która jest zdolna do wykonywania operacji We/Wy (odczyt lub zapis).

w porównaniu ze strumieniami jednokierunkowymi, kanały są dwukierunkowe.

Klasa bufora jest kontenerem danych o stałej wielkości z dodatkowymi metodami odczytu i zapisu danych. Wszystkie dane kanału są obsługiwane przez bufor, ale nigdy bezpośrednio: wszystkie dane, które są wysyłane do kanału, są zapisywane do bufora, wszystkie dane, które są odbierane z kanału, są odczytywane do bufora.

w porównaniu ze strumieniami, które są zorientowane bajtowo, kanały są zorientowane blokowo. We/Wy zorientowane na bajty jest prostsze, ale dla niektórych jednostek we/wy może być raczej wolne. We/Wy zorientowane na bloki mogą być znacznie szybsze, ale bardziej skomplikowane.

Klasa Selector umożliwia zapisywanie zdarzeń z wielu zarejestrowanych obiektów SelectableChannel w jednym wywołaniu. Po pojawieniu się zdarzeń obiekt selektora wysyła je do odpowiednich programów obsługi zdarzeń.

Java NIO2 API

Java Nio2 API opiera się na kanałach asynchronicznych (asynchronousserversocketchannel, AsynchronousSocketChannel, itp.), które obsługują asynchroniczne operacje we/wy (łączenie, odczyt lub zapis, obsługa błędów).

kanały asynchroniczne zapewniają dwa mechanizmy kontroli asynchronicznych operacji We/Wy. Pierwszym mechanizmem jest zwrócenie Javy.util.równoległe.Obiekt Future, który modeluje oczekującą operację i może być użyty do zapytania stanu i uzyskania wyniku. Drugim mechanizmem jest przekazanie do operacji Javy.nio.kanały.Obiekt CompletionHandler, który definiuje metody obsługi, które są wykonywane po zakończeniu lub niepowodzeniu operacji. Dostarczone API dla obu mechanizmów są równoważne.

kanały asynchroniczne zapewniają standardowy sposób wykonywania operacji asynchronicznych niezależnie od platformy. Jednak ilość, jaką Java sockets API może wykorzystać natywne asynchroniczne możliwości systemu operacyjnego, zależy od wsparcia dla tej platformy.

Socket echo server

większość modeli We/Wy wymienionych powyżej jest zaimplementowana tutaj w serwerach echo i klientach z interfejsami API gniazd Java. Serwery i klienci echo działają według następującego algorytmu:

  1. serwer nasłuchuje gniazda na zarejestrowanym porcie TCP 7000
  2. klient łączy się z gniazda na dynamicznym porcie TCP do gniazda serwera
  3. klient odczytuje łańcuch wejściowy z konsoli i wysyła bajty ze swojego gniazda do gniazda serwera
  4. serwer odbiera bajty ze swojego gniazda i odsyła je z powrotem do gniazda klienta
  5. klient odbiera bajty ze swojego gniazda i zapisuje echoed String na konsoli
  6. gdy klient otrzymuje taką samą liczbę bajtów, jaką wysłał, odłącza się od serwera
  7. gdy serwer otrzymuje specjalny ciąg znaków, przestaje nasłuchiwać

konwersja między łańcuchami i bajtami jest tutaj wykonywana jawnie w kodowaniu UTF-8.

dalsze tylko uproszczone kody dla serwerów echo są dostarczane. Link do kompletnych kodów dla serwerów i klientów echo znajduje się we wniosku.

blokowanie serwera Io echo

w poniższym przykładzie model blokowania We/Wy jest zaimplementowany w serwerze echo z Java IO API.

Serwery.accept blokuje metody, dopóki połączenie nie zostanie zaakceptowane. InputStream.odczyt metody blokuje się, dopóki dane wejściowe nie będą dostępne lub klient nie zostanie odłączony. OutputStream.metoda write blokuje, dopóki nie zostaną zapisane wszystkie dane wyjściowe.

public class IoEchoServer { public static void main(String args) throws IOException {
ServerSocket serverSocket = new ServerSocket(7000); while (active) {
Socket socket = serverSocket.accept(); // blocking InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream(); int read;
byte bytes = new byte;
while ((read = is.read(bytes)) != -1) { // blocking
os.write(bytes, 0, read); // blocking
} socket.close();
} serverSocket.close();
}
}

blokowanie serwera NIO echo

w poniższym przykładzie model blokowania I/O jest zaimplementowany w serwerze echo z API Java NIO.

Obiekty ServerSocketChannel i SocketChannel są domyślnie skonfigurowane w trybie blokowania. Kanał Serwerowy.metoda accept blokuje i zwraca obiekt SocketChannel, gdy połączenie jest akceptowane. Serwersocket.odczyt metody blokuje się, dopóki dane wejściowe nie będą dostępne lub klient nie zostanie odłączony. Serwersocket.metoda write blokuje, dopóki nie zostaną zapisane wszystkie dane wyjściowe.

public class NioBlockingEchoServer { public static void main(String args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("localhost", 7000)); while (active) {
SocketChannel socketChannel = serverSocketChannel.accept(); // blocking ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
buffer.clear();
int read = socketChannel.read(buffer); // blocking
if (read < 0) {
break;
} buffer.flip();
socketChannel.write(buffer); // blocking
} socketChannel.close();
} serverSocketChannel.close();
}
}

nieblokujący serwer NIO echo

w poniższym przykładzie nieblokujący model We/Wy jest zaimplementowany w serwerze echo z API Java NIO.

Obiekty ServerSocketChannel i SocketChannel są jawnie skonfigurowane w trybie nieblokującym. Kanał Serwerowy.metoda accept nie blokuje i zwraca null, jeśli nie zostało jeszcze zaakceptowane połączenie lub obiekt SocketChannel w przeciwnym razie. Serwersocket.read nie blokuje i zwraca 0, jeśli nie są dostępne dane lub dodatnią liczbę odczytywanych bajtów. Serwersocket.metoda write nie blokuje, jeśli w buforze wyjściowym gniazda jest wolne miejsce.

public class NioNonBlockingEchoServer { public static void main(String args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(7000)); while (active) {
SocketChannel socketChannel = serverSocketChannel.accept(); // non-blocking
if (socketChannel != null) {
socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
buffer.clear();
int read = socketChannel.read(buffer); // non-blocking
if (read < 0) {
break;
} buffer.flip();
socketChannel.write(buffer); // can be non-blocking
} socketChannel.close();
}
} serverSocketChannel.close();
}
}

multipleksowanie serwer NIO echo

w poniższym przykładzie model I/O multipleksowania jest zaimplementowany w interfejsie API NIO Java serwera echo.

podczas inicjalizacji wiele obiektów ServerSocketChannel, które są skonfigurowane w trybie nieblokującym, są rejestrowane w tym samym obiekcie selektora za pomocą klucza wyboru.Op_accept argument określający, że zdarzenie akceptacji połączenia jest interesujące.

w pętli głównej Selektor.wybierz metodę blokuje, dopóki nie wystąpi co najmniej jedno z zarejestrowanych zdarzeń. Następnie Selektor.metoda selectedKeys zwraca zestaw obiektów SelectionKey, dla których wystąpiły zdarzenia. Iteracja za pomocą obiektów SelectionKey umożliwia określenie, jakie zdarzenie We / Wy (connect, accept, read, write) miało miejsce i które obiekty socketchannel (ServerSocketChannel, SocketChannel) zostały powiązane z tym zdarzeniem.

wskazanie klucza wyboru, że kanał jest gotowy do jakiejś operacji, jest podpowiedzią, a nie gwarancją.

public class NioMultiplexingEchoServer { public static void main(String args) throws IOException {
final int ports = 8;
ServerSocketChannel serverSocketChannels = new ServerSocketChannel; Selector selector = Selector.open(); for (int p = 0; p < ports; p++) {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannels = serverSocketChannel;
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("localhost", 7000 + p)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} while (active) {
selector.select(); // blocking Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = keysIterator.next(); if (key.isAcceptable()) {
accept(selector, key);
}
if (key.isReadable()) {
keysIterator.remove();
read(selector, key);
}
if (key.isWritable()) {
keysIterator.remove();
write(key);
}
}
} for (ServerSocketChannel serverSocketChannel : serverSocketChannels) {
serverSocketChannel.close();
}
}
}

gdy obiekt SelectionKey wskazuje, że doszło do zdarzenia akceptacji połączenia, tworzy się kanał ServerSocketChannel.Zaakceptuj połączenie (które może być nieblokujące), aby zaakceptować połączenie. Następnie nowy obiekt SocketChannel jest konfigurowany w trybie nieblokującym i jest rejestrowany w tym samym obiekcie selektora za pomocą klucza SelectionKey.Op_read argument określający, że teraz Zdarzenie odczytu jest interesujące.

private static void accept(Selector selector, SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept(); // can be non-blocking
if (socketChannel != null) {
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
}

gdy obiekt SelectionKey wskazuje, że nastąpiło zdarzenie odczytu, tworzy kanał SocketChannel.odczyt wywołania (które może być nieblokujące) do odczytu danych z obiektu SocketChannel do nowego obiektu ByteByffer. Następnie obiekt SocketChannel jest rejestrowany w tym samym obiekcie selektora za pomocą klucza SelectionKey.Op_write argument określający, że teraz Zdarzenie zapisu jest interesujące. Dodatkowo ten obiekt Bajtbuffer jest używany podczas rejestracji jako załącznik.

private static void read(Selector selector, SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer); // can be non-blocking buffer.flip();
socketChannel.register(selector, SelectionKey.OP_WRITE, buffer);
}

gdy obiekt SelectionKeys wskazuje, że doszło do zdarzenia zapisu, tworzy się kanał SocketChannel.write call (które może być nieblokujące)do zapisu danych do obiektu SocketChannel z obiektu ByteByffer, wyodrębnionego z klucza wyboru.metoda mocowania. Potem kanał SocketChannel.połączenie cloase zamyka połączenie.

private static void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); socketChannel.write(buffer); // can be non-blocking
socketChannel.close();
}

po każdym odczycie lub zapisie obiekt SelectionKey jest usuwany z zestawu obiektów SelectionKey, aby zapobiec jego ponownemu użyciu. Ale obiekt SelectionKey do akceptacji połączenia nie jest usuwany, aby mieć możliwość wykonania następnej podobnej operacji.

asynchroniczny serwer echo nio2

w poniższym przykładzie asynchroniczny model We/Wy jest zaimplementowany w serwerze echo z API Java NIO2. Klasy AsynchronousServerSocketChannel, asynchronoussocketchannel są tutaj używane z mechanizmem obsługi dopełnienia.

Kanał AsynchronousServerSocketChannel.metoda accept inicjuje operację akceptacji połączenia asynchronicznego.

public class Nio2CompletionHandlerEchoServer { public static void main(String args) throws IOException {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(7000)); AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(serverSocketChannel);
serverSocketChannel.accept(null, acceptCompletionHandler); System.in.read();
}
}

gdy połączenie jest akceptowane (lub operacja nie powiedzie się), wywoływana jest klasa AcceptCompletionHandler, która przez AsynchronousSocketChannel.read (ByteBuffer destination, a attachment, CompletionHandler<Integer,? Super A> handler) metoda inicjuje asynchroniczną operację odczytu z obiektu Asynchroussocketchannel do nowego obiektu bufora bajtów.

class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> { private final AsynchronousServerSocketChannel serverSocketChannel; AcceptCompletionHandler(AsynchronousServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
} @Override
public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
serverSocketChannel.accept(null, this); // non-blocking ByteBuffer buffer = ByteBuffer.allocate(1024);
ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(socketChannel, buffer);
socketChannel.read(buffer, null, readCompletionHandler); // non-blocking
} @Override
public void failed(Throwable t, Void attachment) {
// exception handling
}
}

gdy operacja read zakończy się (lub zakończy się niepowodzeniem), wywoływana jest klasa ReadCompletionHandler, która przez AsynchronousSocketChannel.write (ByteBuffer source, a attachment, CompletionHandler<Integer,? Super A> handler) metoda inicjuje operację asynchronicznego zapisu do obiektu Asynchroussocketchannel z obiektu Bajtbuffer.

class ReadCompletionHandler implements CompletionHandler<Integer, Void> { private final AsynchronousSocketChannel socketChannel;
private final ByteBuffer buffer; ReadCompletionHandler(AsynchronousSocketChannel socketChannel, ByteBuffer buffer) {
this.socketChannel = socketChannel;
this.buffer = buffer;
} @Override
public void completed(Integer bytesRead, Void attachment) {
WriteCompletionHandler writeCompletionHandler = new WriteCompletionHandler(socketChannel);
buffer.flip();
socketChannel.write(buffer, null, writeCompletionHandler); // non-blocking
} @Override
public void failed(Throwable t, Void attachment) {
// exception handling
}
}

gdy operacja zapisu zakończy się (lub zakończy się niepowodzeniem), wywoływana jest klasa WriteCompletionHandler, która przy pomocy AsynchronousSocketChannel.metoda close zamyka połączenie.

class WriteCompletionHandler implements CompletionHandler<Integer, Void> { private final AsynchronousSocketChannel socketChannel; WriteCompletionHandler(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
} @Override
public void completed(Integer bytesWritten, Void attachment) {
try {
socketChannel.close();
} catch (IOException e) {
// exception handling
}
} @Override
public void failed(Throwable t, Void attachment) {
// exception handling
}
}

w tym przykładzie asynchroniczne operacje We/Wy są wykonywane bez przywiązania, ponieważ wszystkie niezbędne obiekty (Asynchroussocketchannel, ByteBuffer) są przekazywane jako argumenty konstruktora dla odpowiednich programów obsługi uzupełniania.

wniosek

wybór modelu We/Wy dla komunikacji gniazd zależy od parametrów ruchu. Jeśli żądania We/Wy są długie i rzadkie, asynchroniczne We / Wy są na ogół dobrym wyborem. Jednakże, jeśli żądania We/Wy są krótkie i szybkie, narzut przetwarzania wywołań jądra może znacznie poprawić synchroniczne We / Wy.

pomimo tego, że Java zapewnia standardowy sposób wykonywania gniazd We / Wy w różnych systemach operacyjnych, rzeczywista wydajność może się znacznie różnić w zależności od ich implementacji. Możliwe jest rozpoczęcie badania tych różnic za pomocą dobrze znanego artykułu dana Kegla problem C10K.

kompletne przykłady kodu są dostępne w repozytorium GitHub.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany.