banner
Rick Sanchez

Rick Sanchez

OS && DB 爱好者,深度学习炼丹师,蒟蒻退役Acmer,二刺螈。

Summary of Pipe Usage in Golang

1. What is a Pipe#

A pipe is a mechanism for inter-process communication (IPC) that allows the output of one process to be used directly as the input of the next process. It is a communication method implemented through the kernel buffer.

Its characteristics:

  1. Unidirectional communication: In a standard pipe, data can only flow in one direction (bidirectional communication can be achieved using named pipes).
  2. Half-duplex: Data can flow in both directions, but not simultaneously.
  3. Temporality: Pipes are typically used for short-term inter-process communication.
  4. Kernel buffer: Data is passed through the kernel buffer rather than through shared memory.

2. Implementing Pipes Using Redirected IO#

We want to implement functionality similar to ps -aux | grep top. In Golang, we can use the following method.

In simple terms, we create a buffer and then redirect the process's stdin/stdout to this buffer, thus achieving a shared memory communication method.

Tip

Here, we use the bytes.Buffer object from the Golang bytes package as our buffer. It provides a buffer for efficiently building and manipulating byte slices, and it can grow dynamically, making it suitable for building and handling frequently concatenated and modified byte data.

package main

import (
	"bytes"
	"fmt"
	"os/exec"
)

func main() {
	cmd1 := exec.Command("ps", "aux")
	cmd2 := exec.Command("grep", "xxx")

	var outputBuf1 bytes.Buffer
	cmd1.Stdout = &outputBuf1

	if err := cmd1.Start(); err != nil {
		fmt.Printf("Error: The first command can't be start up %s\n", err)
		return
	}
	if err := cmd1.Wait(); err != nil {
		fmt.Printf("Error: Couldn't wait for the first command: %s\n", err)
		return
	}

	cmd2.Stdin = &outputBuf1
	var outputBuf2 bytes.Buffer
	cmd2.Stdout = &outputBuf2
	if err := cmd2.Start(); err != nil {
		fmt.Printf("Error: The second command can't be start up %s\n", err)
		return
	}

	if err := cmd2.Wait(); err != nil {
		fmt.Printf("Error: Couldn't wait for the second command: %s\n", err)
		return
	}

	fmt.Printf("%s\n", outputBuf2.String())
}

3. Usage of Pipes in Golang#

Golang provides two methods for using pipes: os.Pipe() and io.Pipe(). These are two different implementations; the former relies on the operating system's pipe mechanism, while the latter is implemented using Golang. Both are anonymous pipes.

Featureos.Pipe()io.Pipe()
Implementation LevelOperating system-level pipe created using low-level system calls.Pure Go implementation of an in-memory pipe, not involving system calls.
Usage ScenarioSuitable for communication with external processes or between different OS threads.Mainly used for data transfer between different Goroutines in the same Go program.
PerformanceMore efficient for large data transfers, utilizing the operating system's buffer.Faster for small data transfers but not suitable for large data transfers.
Cross-Platform CompatibilityBehavior may vary depending on the operating system.Consistent cross-platform behavior due to pure Go implementation.
File DescriptorReturns *os.File type, containing the underlying file descriptor.Returns io.Reader and io.Writer interfaces, not involving file descriptors.
Close BehaviorRequires manual closing of the read and write file descriptors.When one end is closed, the other end automatically returns EOF.
MultiplexingSupports operating system-level multiplexing (e.g., select, poll, or epoll), suitable for handling multiple I/O sources.Does not directly support OS multiplexing but can achieve similar effects through channel and select.
Atomic OperationsThe operating system guarantees that write operations less than or equal to PIPE_BUF (usually 4096 bytes) are atomic. Larger write operations may be split.All write operations' atomicity is ensured by the Go runtime, using mutexes for concurrent safety.
  • os.Pipe() is more suitable for system-level tasks, such as inter-process communication and redirection of standard input and output.
  • io.Pipe() is more suitable for concurrent programming within Go, used for data flow transfer between Goroutines, and can seamlessly integrate with Go's concurrency features (like channel, select).

3.1 os.Pipe()#

package main

import (
	"bytes"
	"fmt"
	"os"
	"sync"
)

func main() {
	reader, writer, err := os.Pipe()
	var wg sync.WaitGroup

	if err != nil {
		fmt.Printf("Error creating pipe: %v\n", err)
		return
	}

	wg.Add(1)
	// Both reading and writing from the pipe can block
	go func() {
		defer wg.Done()
		output := make([]byte, 64)
		n, err := reader.Read(output)
		if err != nil {
			fmt.Printf("Error reading from pipe: %v\n", err)
			return
		}
		fmt.Printf("Read %d bytes\n", n)
	}()

	var inputs bytes.Buffer
	for i := 65; i <= 90; i++ {
		inputs.WriteByte(byte(i))
	}
	n, err := writer.Write(inputs.Bytes())
	if err != nil {
		fmt.Printf("Error writing to pipe: %v\n", err)
		return
	}
	fmt.Printf("Wrote %d bytes\n", n)
	wg.Wait()
}

3.2 io.Pipe()#

io.Pipe() is a pure in-memory pipe implemented in Go. Its performance limitations mainly arise from the following aspects:

  • Memory Buffering: io.Pipe() does not have underlying operating system support and is implemented in memory using Go's buffering and synchronization mechanisms. Without direct kernel support, it requires mutexes and condition variables to achieve read/write synchronization and blocking, which can become a performance bottleneck during high-frequency, large data transfers.

  • Goroutine Scheduling Overhead: io.Pipe() is designed for data transfer between Goroutines. Thus, data transfer and blocking wake-ups occur within the Go runtime's Goroutine scheduling. In scenarios with frequent data transfers or a large number of Goroutines, this scheduling overhead can reduce performance.

  • Lack of System-Level Buffering: The operating system's kernel typically allocates buffers for file descriptors (e.g., os.Pipe()), while io.Pipe() lacks this support. During large data transfers, the absence of kernel buffering requires data to be read and written repeatedly in memory, increasing memory allocation and garbage collection overhead.

The usage of io.Pipe() is basically the same as os.Pipe(), as shown in the example below:

package main

import (
	"bytes"
	"fmt"
	"io"
	"sync"
)

func main() {
	reader, writer := io.Pipe()
	var wg sync.WaitGroup

	wg.Add(1)

	go func() {
		defer wg.Done()
		output := make([]byte, 256)
		n, err := reader.Read(output)
		if err != nil {
			fmt.Printf("Error reading from reader: %v\n", err)
			return
		}
		fmt.Printf("Read %d bytes\n", n)
	}()

	var inputs bytes.Buffer
	for i := 65; i <= 90; i++ {
		inputs.WriteByte(byte(i))
	}
	n, err := writer.Write(inputs.Bytes())
	if err != nil {
		fmt.Printf("Error writing to writer: %v\n", err)
		return
	}
	fmt.Printf("Wrote %d bytes\n", n)
	wg.Wait()
}

4. Multiplexing#

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.