1. 什么是管道#
管道是一种用于 进程间通信 (IPC) 的机制,它允许一个进程的输出直接作为下一个进程的输入,它是一种通过 内核缓冲区 实现的通信方式。
它的特点:
- 单向通信:标准管道中,数据只能沿一个方向流动(双向通信可以用命名管道实现)
- 半双工:数据可以在两个方向上流动,但不能同时进行
- 临时性:管道通常用于短期的进程间通信
- 内核缓冲区:数据通过内核缓冲区传递,而不是共享内存的方式
2. 使用重定向 IO 的方式实现管道#
我们要实现一个类似 ps -aux | grep top
的功能,在 Golang
中我们可以使用下面的方式。
简单的来说,就是我们创建一个缓冲区,然后重定向进程的 stdin/stdout
到这个缓冲区,这样就实现了共享内存的通信方法。
Tip
在这里,我们缓冲区使用 Golang
bytes
包中的bytes.Buffer
对象,他提供了一个缓冲区,用于高效的构建和操作字节切片,它可以动态增长,适用于构建和处理频繁拼接和修改的字节数据。
package main
import (
"bytes"
"fmt"
"os/exec"
)
func main() {
cmd1 := exec.Command("ps", "aux")
cmd2 := exec.Command("grep", "xxx")
var outputBuf1 bytes.Buffer
cmd1.Stdout = &outputBuf1
if err := cmd1.Start(); err != nil {
fmt.Printf("Error: The first command can't be start up %s\n", err)
return
}
if err := cmd1.Wait(); err != nil {
fmt.Printf("Error: Couldn't wait for the first command: %s\n", err)
return
}
cmd2.Stdin = &outputBuf1
var outputBuf2 bytes.Buffer
cmd2.Stdout = &outputBuf2
if err := cmd2.Start(); err != nil {
fmt.Printf("Error: The second command can't be start up %s\n", err)
return
}
if err := cmd2.Wait(); err != nil {
fmt.Printf("Error: Couldn't wait for the second command: %s\n", err)
return
}
fmt.Printf("%s\n", outputBuf2.String())
}
3. Golang 中管道的用法#
Golang 中提供了 2 种管道的使用方法,os.Pipe()
和 io.Pipe()
,这是两种不同的实现方法,前者依赖操作系统的管道机制,后者是使用 Golang
实现的,他们都是匿名管道。
特性 | os.Pipe() | io.Pipe() |
---|---|---|
实现层面 | 操作系统级别管道,使用底层系统调用创建。 | 纯 Go 实现的内存管道,不涉及操作系统调用。 |
使用场景 | 适合与外部进程通信或在不同的操作系统线程间通信。 | 主要用于同一 Go 程序的不同 Goroutine 间的数据传递。 |
性能 | 对于大量数据传输,较高效,利用操作系统的缓冲区。 | 小数据量传输较快,但不适合大量数据传输。 |
跨平台兼容性 | 行为可能因操作系统不同而异。 | 跨平台行为一致,因为纯 Go 实现。 |
文件描述符 | 返回 *os.File 类型,包含底层文件描述符。 | 返回 io.Reader 和 io.Writer 接口,不涉及文件描述符。 |
关闭行为 | 需要手动关闭读端和写端的文件描述符。 | 一端关闭时,另一端自动返回 EOF。 |
多路复用 | 支持操作系统级多路复用(如 select 、poll 或 epoll ),适合处理多个 I/O 源。 | 不直接支持操作系统多路复用,但可通过 channel 和 select 实现类似效果。 |
原子操作 | 操作系统保证小于等于 PIPE_BUF (通常为 4096 字节)的写入操作是原子的。大于此的写操作可能被分割。 | 所有写操作原子性由 Go 运行时确保,利用互斥锁保证并发安全。 |
- os.Pipe() 更适合用于系统级别的任务,比如跨进程通信、标准输入输出的重定向等。
- io.Pipe() 更适合 Go 内部的并发编程,用于 Goroutine 间的数据流传递,可以与 Go 的并发特性(如
channel
、select
)无缝结合。
3.1 os.Pipe()#
package main
import (
"bytes"
"fmt"
"os"
"sync"
)
func main() {
reader, writer, err := os.Pipe()
var wg sync.WaitGroup
if err != nil {
fmt.Printf("Error creating pipe: %v\n", err)
return
}
wg.Add(1)
// 管道这里不管是读还是写都存在阻塞
go func() {
defer wg.Done()
output := make([]byte, 64)
n, err := reader.Read(output)
if err != nil {
fmt.Printf("Error reading from pipe: %v\n", err)
return
}
fmt.Printf("Read %d bytes\n", n)
}()
var inputs bytes.Buffer
for i := 65; i <= 90; i++ {
inputs.WriteByte(byte(i))
}
n, err := writer.Write(inputs.Bytes())
if err != nil {
fmt.Printf("Error writing to pipe: %v\n", err)
return
}
fmt.Printf("Wrote %d bytes\n", n)
wg.Wait()
}
3.2 io.Pipe()#
io.Pipe()
是一个纯内存管道,由 Go 语言在内存中实现。其性能限制主要来源于以下几个方面:
-
内存缓冲:
io.Pipe()
没有底层操作系统的支持,而是通过 Go 的缓冲和同步机制在内存中实现。由于没有直接的内核支持,它需要通过互斥锁(Mutex)和条件变量(Cond)来实现读写同步和阻塞,这在高频、大量数据传输时会成为性能瓶颈。 -
Goroutine 调度开销:
io.Pipe()
的设计目标是在 Goroutine 间传递数据。因此,数据传递和阻塞唤醒都在 Go 运行时的 Goroutine 调度中进行。频繁的数据传递或大量 Goroutine 的场景下,这种调度开销会降低性能。 -
系统级缓冲缺乏:操作系统的内核层面通常会为文件描述符分配缓冲区(例如
os.Pipe()
),而io.Pipe()
缺乏此支持。在传输大量数据时,由于没有内核缓冲,数据需要在内存中反复读写,增加了内存分配和垃圾回收的负担。
io.Pipe()
的使用方法和os.Pipe()
基本一致,例子如下:
package main
import (
"bytes"
"fmt"
"io"
"sync"
)
func main() {
reader, writer := io.Pipe()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
output := make([]byte, 256)
n, err := reader.Read(output)
if err != nil {
fmt.Printf("Error reading from reader: %v\n", err)
return
}
fmt.Printf("Read %d bytes\n", n)
}()
var inputs bytes.Buffer
for i := 65; i <= 90; i++ {
inputs.WriteByte(byte(i))
}
n, err := writer.Write(inputs.Bytes())
if err != nil {
fmt.Printf("Error writing to writer: %v\n", err)
return
}
fmt.Printf("Wrote %d bytes\n", n)
wg.Wait()
}