Linux IPC: UNIX Domain Socket
In the previous article , we have discussed several facilities or mechanisms available on Linux to enable inter-process communications. One of them is Sockets. Due to its flexibility and performance, socket has been widely used in most IPC applications.
In this article, we will explore the UNIX Domain Socket which is used for inter-process communication on the same host system. We will focus on stream sockets.
UNIX Domain Socket
The socket address structure for UNIX Domain Socket is as follow:
struct sockaddr_un {
\tsa_family_t sun_family; // Always AF_UNIX
\tchar sun_path[108]; // NULL-terminated socket pathname
};

The operation flow will be as follows.
11111
The operation flow from the client side is as follows:
11# Simple Server-Client Implementation
Server
#include <sys/un.h>
#include <sys/socket.h>
#include <ctype.h>
...
#define BUF_SIZE 512
#define SV_SOCK_PATH "/tmp/unix_socket"
int main()
{
std::cout << "Hello, Linux IPC and Socket Domain!\n";
struct sockaddr_un addr;
int sfd, cfd;
ssize_t numRead;
char buf[BUF_SIZE];
sfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sfd == -1)
{
// Exit error
}
if (remove(SV_SOCK_PATH) == -1 && errno != ENOENT)
{
// Exit error
}
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX; // UNIX Domain address
strncpy(addr.sun_path, SV_SOCK_PATH, sizeof(addr.sun_path) - 1);
if (bind(sfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1)
{
// Exit error
}
if (listen(sfd, BACKLOG) == -1)
{
// Exit error
}
// Handle client connections iteratively
for (;;)
{
std::cout << "Waiting for a connection...\n";
cfd = accept(sfd, NULL, NULL);
if (cfd == -1)
{
// Exit error;
}
// Transfer data
while ((numRead = read(cfd, buf, BUF_SIZE)) > 0)
{
if (write(STDOUT_FILENO, buf, numRead) != numRead)
{
// Exit error
}
}
if (numRead == -1)
{
// Exit error
}
close(cfd);
}
std::cout << "Client disconnected.\n";
return 0;
}
Client
#include <sys/un.h>
#include <sys/socket.h>
#include <ctype.h>
...
#define BUF_SIZE 512
#define SV_SOCK_PATH "/tmp/unix_socket"
int main(int argc, char *argv[])
{
struct sockaddr_un addr;
int sfd;
ssize_t numRead;
char buf[BUF_SIZE];
sfd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); /* Create client socket */
if (sfd == -1) {
// Error exit
}
/* Construct server address, and make the connection */
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SV_SOCK_PATH, sizeof(addr.sun_path) - 1);
if (connect(sfd, (struct sockaddr *)&addr,
sizeof(struct sockaddr_un)) == -1) {
// Error connect exit
}
/* Copy stdin to socket */
while ((numRead = read(STDIN_FILENO, buf, BUF_SIZE)) > 0) {
if (write(sfd, buf, numRead) != numRead) {
\t\t // error write
}
ssize_t numBytes = read(sfd, buf, BUF_SIZE-1);
printf("Read %zd bytes from server: ", numBytes);
if (numBytes == -1)
continue;
buf[numBytes] = '\0'; /* Null-terminate the string */
printf("Received from server: %s\n", buf);
}
if (numRead == -1)
// exit error read
exit(EXIT_SUCCESS); /* Closes our socket; server sees EOF */
}
Concurrent Server Implementation
The previous example is only a simple server-client implementation. It supports only one client connection at one time. Of course, this is usable in the real-world. In practice, we expect the server to be able to accept multiple connections at the same time. This also applies on communication between processes on the same host. So, how can we achieve this?
There are multiple implementation variants to build a concurrent server.
111You can find the implementation examples below.
Pre-created Threads in Server Pool
// main.cpp
#include <iostream>
#include <sys/un.h>
#include <sys/socket.h>
#include <ctype.h>
#include <atomic>
#include <thread>
#include <vector>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <functional>
extern "C"
{
#include "lib/error_functions.h"
#include "lib/tlpi_hdr.h"
}
#define BUF_SIZE 10
#define SV_SOCK_PATH "/tmp/ud_ucase"
#define BACKLOG 5
#define MAX_THREADS 5
class ServerPool {
public:
ServerPool(size_t num_threads = MAX_THREADS)
: running(true), active_threads(0)
{
threads.reserve(num_threads);
for (int i = 0; i < MAX_THREADS; ++i) {
threads.emplace_back(&ServerPool::worker_loop, this);
}
}
~ServerPool() {
running = false;
condition.notify_all();
for (auto &thread: threads) {
if (thread.joinable()) {
thread.join();
}
}
}
void add_task(std::function<void()> task) {
{
std::cout << "Adding task to the queue. Current active threads: " << active_threads.load() << std::endl;
std::unique_lock<std::mutex> lock(threads_mutex);
tasks.push(std::move(task));
std::cout << "Task added. Total tasks: " << tasks.size() << std::endl;
}
condition.notify_one();
}
int get_active_threads() const {
return active_threads.load();
}
void stop() {
running = false;
condition.notify_all();
}
void wait_for_completion() {
std::unique_lock<std::mutex> lock(threads_mutex);
condition.wait(lock, [this] { return tasks.empty() && active_threads.load() == 0; });
}
void reset() {
std::unique_lock<std::mutex> lock(threads_mutex);
while (!tasks.empty()) {
tasks.pop();
}
active_threads = 0;
}
private:
void worker_loop() {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(threads_mutex);
condition.wait(lock, [this] {
return !tasks.empty() || !running;
});
std::cout << "Thread " << std::this_thread::get_id() << " checking for tasks. Active threads: " << active_threads.load() << std::endl;
if (!running && tasks.empty()) {
return; // Exit if no tasks and not running
}
task = std::move(tasks.front());
tasks.pop();
active_threads++;
}
try {
std::cout << "Thread " << std::this_thread::get_id() << " executing task. Active threads: " << active_threads.load() << std::endl;
task();
} catch (const std::exception& e) {
std::cerr << "Task execution error: " << e.what() << std::endl;
}
active_threads--;
}
}
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
std::condition_variable condition;
std::mutex threads_mutex;
std::atomic<bool> running{true};
std::atomic<int> active_threads{0};
};
int main()
{
std::cout << "Hello, Linux IPC and Socket Domain!\n";
struct sockaddr_un addr;
int sfd, cfd;
ssize_t numRead;
char buf[BUF_SIZE];
sfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sfd == -1)
{
errExit("socket");
}
if (remove(SV_SOCK_PATH) == -1 && errno != ENOENT)
{
errExit("remove-%s", SV_SOCK_PATH);
}
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX; // UNIX Domain address
strncpy(addr.sun_path, SV_SOCK_PATH, sizeof(addr.sun_path) - 1);
if (bind(sfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1)
{
errExit("bind");
}
if (listen(sfd, BACKLOG) == -1)
{
errExit("listen");
}
ServerPool server_pool;
// Handle client connections iteratively
for (;;)
{
std::cout << "Waiting for a connection...\n";
cfd = accept(sfd, NULL, NULL);
if (cfd == -1)
{
errExit("accept");
}
std::cout << "Client connected.\n";
server_pool.add_task([cfd]() {
ssize_t numRead;
char buf[BUF_SIZE];
// Read data from the client
while ((numRead = read(cfd, buf, BUF_SIZE)) > 0)
{
std::cout << "Received " << numRead << " bytes: ";
for (ssize_t i = 0; i < numRead; ++i)
{
std::cout << static_cast<char>(toupper(buf[i]));
}
std::cout << std::endl;
// Echo back to the client
if (write(cfd, buf, numRead) != numRead)
{
errMsg("partial/failed write");
}
}
if (numRead == -1)
{
errExit("read");
}
close(cfd); // Close the client socket
});
std::cout << "Task added to server pool. Active threads: " << server_pool.get_active_threads() << std::endl;
}
std::cout << "Client disconnected.\n";
return 0;
}
Single Process Concurrent Server using epoll()
// main.cpp
#include <iostream>
#include <sys/un.h>
#include <sys/socket.h>
#include <ctype.h>
#include <atomic>
#include <thread>
#include <vector>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <functional>
extern "C"
{
#include "lib/error_functions.h"
#include "lib/tlpi_hdr.h"
}
#include <sys/epoll.h>
#define BUF_SIZE 512
#define SV_SOCK_PATH "/tmp/ud_ucase"
#define BACKLOG 5
#define MAX_THREADS 5
#define MAX_BUF 512
int main()
{
std::cout << "Hello, Linux IPC and Socket Domain!\n";
struct sockaddr_un addr;
int sfd, cfd;
ssize_t numRead;
char buf[BUF_SIZE];
sfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sfd == -1)
{
errExit("socket");
}
if (remove(SV_SOCK_PATH) == -1 && errno != ENOENT)
{
errExit("remove-%s", SV_SOCK_PATH);
}
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX; // UNIX Domain address
strncpy(addr.sun_path, SV_SOCK_PATH, sizeof(addr.sun_path) - 1);
if (bind(sfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1)
{
errExit("bind");
}
if (listen(sfd, BACKLOG) == -1)
{
errExit("listen");
}
int connected_clients = 0;
// Create an epoll
int epfd = epoll_create(1);
if (epfd == -1)
{
errExit("epoll_create");
}
int epfd2 = epoll_create(5);
if (epfd2 == -1) {
errExit("epoll_create2");
}
struct epoll_event ev;
ev.events = EPOLLIN; // Interested in read events
ev.data.fd = sfd; // Monitor the server socket
if (epoll_ctl(epfd, EPOLL_CTL_ADD, sfd, &ev) == -1)
{
errExit("epoll_ctl");
}
std::cout << "Server is listening on " << SV_SOCK_PATH << std::endl;
#define MAX_EVENTS 10
struct epoll_event evlist[MAX_EVENTS];
// Handle client connections iteratively
for (;;)
{
std::cout << "Waiting for a connection...\n";
int ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1);
if (ready == -1)
{
if (errno == EINTR)
{
continue; // Interrupted by a signal, retry
}
errExit("epoll_wait");
}
printf("Ready %d events\n", ready);
for (int j=0; j<ready; j++) {
std::cout << "Event detected on fd: " << evlist[j].data.fd << " " << evlist[j].events << "\n";
if (evlist[j].events & EPOLLIN) // Check if the event is for reading
{
if (evlist[j].data.fd != sfd) {
int numRead = read(evlist[j].data.fd, buf, MAX_BUF);
if (numRead == -1)
errExit("read");
std::cout << "Received " << numRead << " bytes: ";
for (ssize_t i = 0; i < numRead; ++i)
{
buf[i] = toupper(buf[i]);
}
// Echo back to the client
if (write(evlist[j].data.fd, buf, numRead) != numRead)
{
errMsg("partial/failed write");
}
} else {
std::cout << "New connection detected.\n";
int cfd = accept(sfd, NULL, NULL);
if (cfd == -1)
{
errExit("accept");
}
// Add the new client socket to the epoll instance
struct epoll_event clientEv;
clientEv.events = EPOLLIN; // Interested in read events
clientEv.data.fd = cfd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &clientEv) == -1)
{
errExit("epoll_ctl");
}
connected_clients++;
}
}
if (evlist[j].events & (EPOLLHUP | EPOLLERR)) // Check for hangup or error
{
std::cout << "Closing fd " << evlist[j].data.fd << "\n";
// Remove the client socket from the epoll instance
if (epoll_ctl(epfd, EPOLL_CTL_DEL, evlist[j].data.fd, NULL) == -1)
{
errExit("epoll_ctl-del");
}
if (close(evlist[j].data.fd) == -1)
{
errExit("close");
}
connected_clients--;
}
else
{
std::cerr << "Unexpected event detected.\n";
continue; // Skip unexpected events
}
}
printf("Connected Device %d\n", connected_clients);
}
std::cout << "Client disconnected.\n";
return 0;
}
#