#include #include #include #include #include #include #include #include #include #include #include bool file_exists(const std::string& path) { struct stat buffer; return stat(path.c_str(), &buffer) == 0; } std::string read_file(const std::string& path) { std::ifstream file(path); if (!file.is_open()) return "N/A"; std::string content; std::getline(file, content); return content; } std::string get_uuid(const std::string& device) { blkid_cache cache; if (blkid_get_cache(&cache, "/dev") != 0) { return "N/A"; } // Используем флаг 0 вместо BLKID_FLAG_SCAN blkid_dev blk_device = blkid_get_dev(cache, device.c_str(), 0); if (blk_device) { const char* uuid = blkid_get_tag_value(cache, blk_device, "UUID"); if (uuid) { return std::string(uuid); } } return "N/A"; } static PyObject* get_block_devices(PyObject* self, PyObject* args) { PyObject* devices_list = PyList_New(0); struct udev* udev = udev_new(); if (!udev) { PyErr_SetString(PyExc_OSError, "Failed to initialize udev"); return NULL; } struct udev_enumerate* enumerate = udev_enumerate_new(udev); udev_enumerate_add_match_subsystem(enumerate, "block"); udev_enumerate_scan_devices(enumerate); struct udev_list_entry* devices = udev_enumerate_get_list_entry(enumerate); struct udev_list_entry* dev_list_entry; udev_list_entry_foreach(dev_list_entry, devices) { const char* path = udev_list_entry_get_name(dev_list_entry); struct udev_device* dev = udev_device_new_from_syspath(udev, path); if (!dev) continue; const char* name = udev_device_get_sysname(dev); const char* model = udev_device_get_sysattr_value(dev, "device/model"); const char* serial = udev_device_get_sysattr_value(dev, "device/serial"); // Получаем UUID устройства std::string uuid = get_uuid(name); PyObject* device_dict = Py_BuildValue( "{s:s, s:s, s:s, s:s}", "name", name ? name : "N/A", "model", model ? model : "N/A", "serial", serial ? serial : "N/A", "uuid", uuid.c_str() ); PyList_Append(devices_list, device_dict); Py_DECREF(device_dict); udev_device_unref(dev); } udev_enumerate_unref(enumerate); udev_unref(udev); return devices_list; } static PyObject* get_mount_points(PyObject* self, PyObject* args) { PyObject* partitions_list = PyList_New(0); int fd = open("/proc/self/mountinfo", O_RDONLY); if (fd == -1) { PyErr_SetString(PyExc_OSError, "Failed to open /proc/self/mountinfo"); return NULL; } char buffer[8192]; ssize_t bytes_read = read(fd, buffer, sizeof(buffer) - 1); close(fd); if (bytes_read <= 0) { return partitions_list; } buffer[bytes_read] = '\0'; std::istringstream stream(buffer); std::string line; while (std::getline(stream, line)) { std::istringstream line_stream(line); std::vector tokens; std::string token; while (line_stream >> token) { tokens.push_back(token); } if (tokens.size() >= 10) { std::string mountpoint = tokens[4]; std::string dev_name = tokens[9]; if (dev_name.find("/dev/") != 0) continue; // Получаем UUID устройства std::string uuid = get_uuid(dev_name); std::string rotational_path = "/sys/block/" + dev_name.substr(5) + "/queue/rotational"; std::string type = file_exists(rotational_path) && read_file(rotational_path) == "1" ? "HDD" : "SSD"; PyObject* partition_dict = Py_BuildValue( "{s:s, s:s, s:s, s:s}", "mountpoint", mountpoint.c_str(), "uuid", uuid.c_str(), "type", type.c_str() ); PyList_Append(partitions_list, partition_dict); Py_DECREF(partition_dict); } } return partitions_list; } static PyMethodDef LsblkMethods[] = { {"get_block_devices", get_block_devices, METH_VARARGS, "Get block devices info"}, {"get_mount_points", get_mount_points, METH_VARARGS, "Get mount points info"}, {NULL, NULL, 0, NULL} }; static struct PyModuleDef lsblkmodule = { PyModuleDef_HEAD_INIT, "lsblk", "Module to retrieve block devices and mount points information", -1, LsblkMethods }; PyMODINIT_FUNC PyInit_lsblk(void) { return PyModule_Create(&lsblkmodule); }