#include <blkid/blkid.h>
#include <Python.h>
#include <dirent.h>
#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <libudev.h>
bool file_exists(const std::string& path) {
struct stat buffer;
return stat(path.c_str(), &buffer) == 0;
}
std::string read_file(const std::string& path) {
std::ifstream file(path);
if (!file.is_open()) return "N/A";
std::string content;
std::getline(file, content);
return content;
}
std::string get_uuid(const std::string& device) {
blkid_cache cache;
if (blkid_get_cache(&cache, "/dev") != 0) {
return "N/A";
}
// Используем флаг 0 вместо BLKID_FLAG_SCAN
blkid_dev blk_device = blkid_get_dev(cache, device.c_str(), 0);
if (blk_device) {
const char* uuid = blkid_get_tag_value(cache, blk_device, "UUID");
if (uuid) {
return std::string(uuid);
}
}
return "N/A";
}
static PyObject* get_block_devices(PyObject* self, PyObject* args) {
PyObject* devices_list = PyList_New(0);
struct udev* udev = udev_new();
if (!udev) {
PyErr_SetString(PyExc_OSError, "Failed to initialize udev");
return NULL;
}
struct udev_enumerate* enumerate = udev_enumerate_new(udev);
udev_enumerate_add_match_subsystem(enumerate, "block");
udev_enumerate_scan_devices(enumerate);
struct udev_list_entry* devices = udev_enumerate_get_list_entry(enumerate);
struct udev_list_entry* dev_list_entry;
udev_list_entry_foreach(dev_list_entry, devices) {
const char* path = udev_list_entry_get_name(dev_list_entry);
struct udev_device* dev = udev_device_new_from_syspath(udev, path);
if (!dev) continue;
const char* name = udev_device_get_sysname(dev);
const char* model = udev_device_get_sysattr_value(dev, "device/model");
const char* serial = udev_device_get_sysattr_value(dev, "device/serial");
// Получаем UUID устройства
std::string uuid = get_uuid(name);
PyObject* device_dict = Py_BuildValue(
"{s:s, s:s, s:s, s:s}",
"name", name ? name : "N/A",
"model", model ? model : "N/A",
"serial", serial ? serial : "N/A",
"uuid", uuid.c_str()
);
PyList_Append(devices_list, device_dict);
Py_DECREF(device_dict);
udev_device_unref(dev);
}
udev_enumerate_unref(enumerate);
udev_unref(udev);
return devices_list;
}
static PyObject* get_mount_points(PyObject* self, PyObject* args) {
PyObject* partitions_list = PyList_New(0);
int fd = open("/proc/self/mountinfo", O_RDONLY);
if (fd == -1) {
PyErr_SetString(PyExc_OSError, "Failed to open /proc/self/mountinfo");
return NULL;
}
char buffer[8192];
ssize_t bytes_read = read(fd, buffer, sizeof(buffer) - 1);
close(fd);
if (bytes_read <= 0) {
return partitions_list;
}
buffer[bytes_read] = '\0';
std::istringstream stream(buffer);
std::string line;
while (std::getline(stream, line)) {
std::istringstream line_stream(line);
std::vector<std::string> tokens;
std::string token;
while (line_stream >> token) {
tokens.push_back(token);
}
if (tokens.size() >= 10) {
std::string mountpoint = tokens[4];
std::string dev_name = tokens[9];
if (dev_name.find("/dev/") != 0) continue;
// Получаем UUID устройства
std::string uuid = get_uuid(dev_name);
std::string rotational_path = "/sys/block/" + dev_name.substr(5) + "/queue/rotational";
std::string type = file_exists(rotational_path) && read_file(rotational_path) == "1" ? "HDD" : "SSD";
PyObject* partition_dict = Py_BuildValue(
"{s:s, s:s, s:s, s:s}",
"mountpoint", mountpoint.c_str(),
"uuid", uuid.c_str(),
"type", type.c_str()
);
PyList_Append(partitions_list, partition_dict);
Py_DECREF(partition_dict);
}
}
return partitions_list;
}
static PyMethodDef LsblkMethods[] = {
{"get_block_devices", get_block_devices, METH_VARARGS, "Get block devices info"},
{"get_mount_points", get_mount_points, METH_VARARGS, "Get mount points info"},
{NULL, NULL, 0, NULL}
};
static struct PyModuleDef lsblkmodule = {
PyModuleDef_HEAD_INIT,
"lsblk",
"Module to retrieve block devices and mount points information",
-1,
LsblkMethods
};
PyMODINIT_FUNC PyInit_lsblk(void) {
return PyModule_Create(&lsblkmodule);
}