Go to Rust: #9. 비동기 프로그래밍
이 글은 Claude Opus 4.5 을 이용해 초안이 작성되었으며, 이후 퇴고를 거쳤습니다.
Rust의 async/await는 Go의 goroutine과 근본적으로 다른 접근 방식입니다. Go는 런타임이 모든 것을 관리하지만, Rust는 명시적인 비동기 모델을 사용합니다. 이 섹션에서는 Rust의 비동기 생태계와 Tokio 런타임을 학습합니다.
9.1 비동기의 필요성#
동기 vs 비동기#
동기 (Synchronous):
// 순차적 실행, 블로킹
fn fetch_data() -> Data {
let response = http_get("https://api.example.com"); // 대기
parse(response)
}
비동기 (Asynchronous):
// 대기 중 다른 작업 가능
async fn fetch_data() -> Data {
let response = http_get("https://api.example.com").await; // 양보
parse(response)
}
Go의 goroutine 모델#
Go는 런타임이 내장되어 있어 동기 코드처럼 보이지만 비동기로 동작합니다:
// Go: 동기처럼 보이지만 비동기
func fetchData() Data {
// 내부적으로 런타임이 다른 goroutine으로 전환
response := httpGet("https://api.example.com")
return parse(response)
}
func main() {
go fetchData() // goroutine 생성
go fetchData() // 동시 실행
// ...
}
Rust의 async 모델#
Rust는 명시적입니다:
// Rust: 명시적 async/await
async fn fetch_data() -> Data {
let response = http_get("https://api.example.com").await;
parse(response)
}
#[tokio::main]
async fn main() {
let handle1 = tokio::spawn(fetch_data());
let handle2 = tokio::spawn(fetch_data());
let (result1, result2) = tokio::join!(handle1, handle2);
}
Go vs Rust 비동기 비교#
| 측면 | Go | Rust |
|---|---|---|
| 런타임 | 내장 | 선택 (tokio, async-std) |
| 코드 스타일 | 동기처럼 보임 | 명시적 async/await |
| 스케줄러 | 자동 | 런타임에 위임 |
| 컬러링 | 없음 | 있음 (async 전염) |
| 블로킹 코드 | 자동 처리 | 명시적 처리 필요 |
언제 스레드, 언제 async#
스레드 사용:
- CPU 바운드 작업
- 블로킹 라이브러리 사용
- 단순한 병렬 처리
async 사용:
- IO 바운드 작업
- 많은 동시 연결 처리
- 네트워크 서버
// CPU 바운드: 스레드 또는 rayon
let handles: Vec<_> = (0..4)
.map(|i| std::thread::spawn(move || heavy_computation(i)))
.collect();
// IO 바운드: async
async fn handle_connections() {
// 수천 개의 동시 연결 처리 가능
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(handle_connection(socket));
}
}
9.2 Future 트레이트#
Future란?#
Future는 아직 완료되지 않은 비동기 계산을 나타냅니다:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // 완료됨
Pending, // 아직 진행 중
}
지연 실행 (Lazy Execution)#
중요: Rust의 Future는 게으릅니다. await하거나 런타임에 제출하기 전까지 실행되지 않습니다:
async fn say_hello() {
println!("Hello!");
}
fn main() {
let future = say_hello(); // 아무것도 출력 안 됨!
// future는 그냥 값일 뿐, 실행되지 않음
}
#[tokio::main]
async fn main() {
let future = say_hello(); // 아직 실행 안 됨
future.await; // 이제 "Hello!" 출력
}
Go와 대조:
// Go: goroutine은 즉시 실행 시작
func sayHello() {
fmt.Println("Hello!")
}
func main() {
go sayHello() // 즉시 실행 시작됨
time.Sleep(time.Second)
}
9.3 async/await 기초#
async fn: 비동기 함수 정의#
// async 함수는 Future를 반환
async fn fetch_user(id: u32) -> User {
// 비동기 작업
User { id, name: String::from("Alice") }
}
// 위 함수는 대략 이것과 같음
fn fetch_user(id: u32) -> impl Future<Output = User> {
async move {
User { id, name: String::from("Alice") }
}
}
.await: Future 실행#
async fn process() {
let user = fetch_user(1).await; // 완료까지 대기
println!("User: {}", user.name);
}
비동기 블록#
async fn example() {
// 비동기 블록
let future = async {
let a = fetch_a().await;
let b = fetch_b().await;
a + b
};
let result = future.await;
}
async move#
클로저처럼 소유권 이동이 필요할 때:
async fn process(data: String) {
let future = async move {
// data의 소유권을 가져옴
println!("{}", data);
};
future.await;
// println!("{}", data); // 에러! data는 이동됨
}
에러 처리와 ?#
async fn fetch_and_process() -> Result<Data, Error> {
let response = fetch("https://api.example.com").await?;
let data = parse(response)?;
Ok(data)
}
9.4 Tokio 런타임#
런타임이 필요한 이유#
Rust의 async/await는 언어 기능일 뿐, 실행을 위한 런타임이 필요합니다:
// 이것만으로는 실행 안 됨
async fn hello() {
println!("Hello");
}
fn main() {
// hello(); // Future만 생성, 실행 안 됨
}
Tokio 설치와 설정#
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
#[tokio::main] 매크로#
#[tokio::main]
async fn main() {
println!("Hello from async main!");
}
// 위 코드는 대략 이것과 같음
fn main() {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async {
println!("Hello from async main!");
});
}
tokio::spawn: 태스크 생성#
Go의 go 키워드와 유사:
// Go
go func() {
// 동시 실행
}()
// Rust
tokio::spawn(async {
// 동시 실행
});
전체 예제:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
sleep(Duration::from_secs(1)).await;
"task completed"
});
println!("Waiting...");
let result = handle.await.unwrap();
println!("Result: {}", result);
}
Go 런타임과의 비교#
| 측면 | Go | Tokio |
|---|---|---|
| 초기화 | 자동 | #[tokio::main] 또는 수동 |
| 스케줄러 | 내장, M:N | 작업 훔치기 (work stealing) |
| 스택 | 동적 성장 | 상태 머신 (스택리스) |
| 생성 비용 | 매우 낮음 | 낮음 |
9.5 비동기 IO#
tokio::fs: 파일 시스템#
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn file_operations() -> std::io::Result<()> {
// 파일 읽기
let content = fs::read_to_string("input.txt").await?;
// 파일 쓰기
fs::write("output.txt", "Hello, World!").await?;
// 스트림으로 읽기
let mut file = fs::File::open("input.txt").await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
Ok(())
}
tokio::net: 네트워크#
TCP 서버:
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, addr) = listener.accept().await?;
println!("Connection from: {}", addr);
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = socket.read(&mut buf).await.unwrap();
if n == 0 { break; }
socket.write_all(&buf[..n]).await.unwrap();
}
});
}
}
TCP 클라이언트:
use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn connect() -> std::io::Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
stream.write_all(b"Hello!").await?;
let mut buf = [0; 1024];
let n = stream.read(&mut buf).await?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
Ok(())
}
Go 비교:
// Go TCP 서버
func main() {
listener, _ := net.Listen("tcp", ":8080")
for {
conn, _ := listener.Accept()
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
buf := make([]byte, 1024)
for {
n, _ := conn.Read(buf)
if n == 0 { break }
conn.Write(buf[:n])
}
}
9.6 동시 실행#
tokio::join!: 모두 완료 대기#
use tokio::time::{sleep, Duration};
async fn fetch_user() -> User { /* ... */ }
async fn fetch_posts() -> Vec<Post> { /* ... */ }
async fn fetch_comments() -> Vec<Comment> { /* ... */ }
#[tokio::main]
async fn main() {
// 순차 실행 (느림)
let user = fetch_user().await;
let posts = fetch_posts().await;
let comments = fetch_comments().await;
// 동시 실행 (빠름)
let (user, posts, comments) = tokio::join!(
fetch_user(),
fetch_posts(),
fetch_comments()
);
}
tokio::select!: 첫 번째 완료 대기#
Go의 select와 유사:
// Go
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
case <-time.After(time.Second):
fmt.Println("timeout")
}
// Rust
use tokio::select;
use tokio::time::{sleep, Duration};
async fn race_example() {
let mut ch1 = /* ... */;
let mut ch2 = /* ... */;
select! {
msg = ch1.recv() => println!("ch1: {:?}", msg),
msg = ch2.recv() => println!("ch2: {:?}", msg),
_ = sleep(Duration::from_secs(1)) => println!("timeout"),
}
}
타임아웃#
use tokio::time::{timeout, Duration};
async fn with_timeout() -> Result<Data, Error> {
match timeout(Duration::from_secs(5), fetch_data()).await {
Ok(result) => result,
Err(_) => Err(Error::Timeout),
}
}
취소#
Rust에서 Future를 드롭하면 취소됩니다:
async fn cancellation_example() {
let handle = tokio::spawn(async {
loop {
// 장시간 작업
tokio::time::sleep(Duration::from_secs(1)).await;
println!("tick");
}
});
// 2초 후 취소
tokio::time::sleep(Duration::from_secs(2)).await;
handle.abort(); // 태스크 취소
}
9.7 비동기 채널#
tokio::sync::mpsc#
Go의 channel과 유사:
// Go
ch := make(chan int, 100)
go func() {
ch <- 42
}()
value := <-ch
// Rust
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
tx.send(42).await.unwrap();
});
let value = rx.recv().await.unwrap();
println!("{}", value);
}
채널 종류#
| 채널 | 용도 | Go 대응 |
|---|---|---|
mpsc::channel |
다중 생산자, 단일 소비자 | chan T |
mpsc::unbounded_channel |
무제한 버퍼 | make(chan T) |
oneshot::channel |
일회성 | - |
broadcast::channel |
브로드캐스트 | - |
watch::channel |
최신 값 공유 | - |
oneshot: 일회성 채널#
use tokio::sync::oneshot;
async fn compute_result(tx: oneshot::Sender<i32>) {
let result = heavy_computation();
tx.send(result).unwrap();
}
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(compute_result(tx));
let result = rx.await.unwrap();
println!("Result: {}", result);
}
broadcast: 브로드캐스트#
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, _) = broadcast::channel(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
println!("rx1: {}", rx1.recv().await.unwrap());
});
tokio::spawn(async move {
println!("rx2: {}", rx2.recv().await.unwrap());
});
tx.send(42).unwrap(); // 모든 구독자에게 전송
}
9.8 동기화 프리미티브#
tokio::sync::Mutex#
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = tokio::spawn(async move {
let mut lock = counter.lock().await; // 비동기 락
*lock += 1;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("Counter: {}", *counter.lock().await);
}
주의: std::sync::Mutex를 .await 포인트 넘어서 들고 있으면 안 됩니다:
// 위험! std::sync::Mutex를 await 넘어서 보유
let lock = std_mutex.lock().unwrap();
some_async_fn().await; // 다른 태스크가 같은 스레드에서 락 시도 시 데드락
drop(lock);
// 안전: tokio::sync::Mutex 사용
let lock = tokio_mutex.lock().await;
some_async_fn().await; // OK
drop(lock);
Semaphore: 동시성 제한#
use tokio::sync::Semaphore;
use std::sync::Arc;
async fn limited_concurrency() {
let semaphore = Arc::new(Semaphore::new(3)); // 최대 3개 동시 실행
let mut handles = vec![];
for i in 0..10 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let handle = tokio::spawn(async move {
println!("Task {} started", i);
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Task {} done", i);
drop(permit); // 허가 반환
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
9.9 실전 비동기 패턴#
HTTP 클라이언트: reqwest#
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
use serde::Deserialize;
#[derive(Deserialize, Debug)]
struct User {
id: u32,
name: String,
}
#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
let user: User = reqwest::get("https://api.example.com/user/1")
.await?
.json()
.await?;
println!("{:?}", user);
Ok(())
}
웹 서버: axum#
[dependencies]
axum = "0.7"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
use axum::{
routing::{get, post},
Router, Json,
extract::Path,
};
use serde::{Deserialize, Serialize};
#[derive(Serialize)]
struct User {
id: u32,
name: String,
}
#[derive(Deserialize)]
struct CreateUser {
name: String,
}
async fn get_user(Path(id): Path<u32>) -> Json<User> {
Json(User { id, name: String::from("Alice") })
}
async fn create_user(Json(payload): Json<CreateUser>) -> Json<User> {
Json(User { id: 1, name: payload.name })
}
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/users/:id", get(get_user))
.route("/users", post(create_user));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
Go의 net/http와 비교:
// Go
func getUser(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
json.NewEncoder(w).Encode(User{ID: id, Name: "Alice"})
}
func main() {
r := chi.NewRouter()
r.Get("/users/{id}", getUser)
http.ListenAndServe(":3000", r)
}
그레이스풀 셧다운#
use tokio::signal;
#[tokio::main]
async fn main() {
let app = Router::new().route("/", get(|| async { "Hello" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();
}
async fn shutdown_signal() {
signal::ctrl_c().await.expect("failed to listen for ctrl+c");
println!("Shutting down...");
}
9.10 요약#
Go vs Rust 비동기 비교#
| 측면 | Go | Rust (Tokio) |
|---|---|---|
| 런타임 | 내장 | 외부 (선택) |
| 동시 실행 | go func() |
tokio::spawn() |
| 채널 | chan T |
mpsc::channel |
| 선택적 수신 | select {} |
tokio::select! |
| 타임아웃 | time.After |
tokio::time::timeout |
| HTTP 서버 | net/http |
axum, actix-web |
async 사용 시 주의사항#
- 블로킹 코드:
spawn_blocking으로 래핑 - std Mutex: await 넘어서 보유 금지
- CPU 바운드:
rayon또는spawn_blocking사용 - Send 바운드:
tokio::spawn에 전달하는 Future는 Send여야 함
연습 문제#
-
기본 async: 1초 후 “Hello"를 출력하는 비동기 함수를 작성하세요.
-
동시 실행: 3개의 API를 동시에 호출하고 결과를 합치는 함수를 작성하세요.
-
타임아웃: 5초 타임아웃이 있는 HTTP 요청 함수를 작성하세요.
-
채널: 생산자-소비자 패턴을
mpsc로 구현하세요. -
간단한 서버: axum으로 JSON을 반환하는 REST API를 작성하세요.
-
동시성 제한: Semaphore를 사용해 최대 5개의 동시 요청만 허용하는 함수를 작성하세요.