# HardwareInterface (`assignments::two::g2_2025_lifecycle_node`) ## Overview The `HardwareInterface` is a c++ class responsible for managing low-level hardware communication with ESP32-IMU combination via serial/MQTT and provides JSON parsing of sensor data, and publishes standardized `sensor_msgs::msg::Imu` messages to the database writer. #### Implementation Details **Public Methods** - **`start_read()`**: Spawns a background thread that continuously reads JSON payloads from the serial device - **`stop_read()`**: Signals the reader thread to exit and joins it for clean shutdown - **`write(const std::string& data)`**: Writes data to the serial device - **`open_device(const std::string& device_path, int baud_rate)`**: Opens and configures a serial port - **`is_device_open()`**: Checks if the serial device is currently open - **`close_device()`**: Closes the serial port and releases resources - **`mqtt_configure()`**: Initializes MQTT client and broker connection setup - **`mqtt_reader()`**: Attaches MQTT callbacks to begin receiving messages - **`mqtt_connect()`**: Establishes connection to the MQTT broker and subscribes to topics - **`close_mqtt_conn()`**: Disconnects from broker and cleans up MQTT resources - **`parse_data(const std::string& data)`**: Parses JSON payload into `sensor_msgs::msg::Imu` and publishes - **`publish_imu_data(const sensor_msgs::msg::Imu::SharedPtr msg)`**: Publishes an IMU message to the ROS topic **Constructor** ```cpp HardwareInterface() ``` - Initializes ROS2 node with name `hardware_interface` - Creates a ROS2 publisher for `sensor_msgs::msg::Imu` on topic `imu_data` with queue size 10 - Logs initialization status **Member Variables** - **`serialib serial`**: Encapsulates serial port communication - **`std::shared_ptr mqtt_client`**: Persistent MQTT async client for broker communication - **`std::shared_ptr mqtt_cb`**: MQTT callback handler for message arrival events - **`std::thread read_thread_`**: Background reader thread for continuous serial data acquisition - **`std::atomic_bool reading_`**: Thread-safe flag to signal the reader thread to stop - **`std::string partial_buffer_`**: Accumulates fragmented serial reads until complete messages are available - **`rclcpp::Publisher::SharedPtr imu_publisher`**: ROS2 publisher for IMU data **MQTT Constants** - **`SERVER_ADDRESS`**: `"tcp://localhost:1883"` — Default MQTT broker address - **`CLIENT_ID`**: `"cpp_mqtt_client"` — MQTT client identifier - **`TOPIC`**: `"esp32/imu"` — Default subscription topic for IMU data ## Core Functions ### `void start_read()` Initiates continuous serial data acquisition in a background thread. **Behavior:** - Checks if a reader thread is already running; returns if so - Sets the `reading_` atomic flag to true - Spawns a thread that: 1. Allocates a 116-byte buffer 2. Enters a loop that runs while `reading_` is true 3. Calls `serial.readString()` with a 1-second timeout 4. Accumulates received bytes into `partial_buffer_` 5. Splits on newline (`\n`) to extract complete lines 6. Trims whitespace and strips leading garbage up to the first `{` 7. Validates that a closing `}` exists; if not, waits for more data 8. Calls `parse_data()` on each complete JSON line - Returns immediately while the thread continues running **Error Handling:** - Invalid JSON lines are logged as errors in `parse_data()` but do not crash the thread ### `void stop_read()` Cleanly terminates the background reader thread. **Behavior:** - Returns immediately if not currently reading - Sets `reading_` atomic flag to false to signal the thread - Joins the thread to wait for its completion - Ensures all resources are released before returning **Thread Safety:** - Uses atomic flag for lock-free signaling - Blocks until thread joins, guaranteeing clean shutdown ### `void parse_data(const std::string& data)` Deserializes a JSON string into a ROS2 IMU message and publishes it. **Behavior:** - Attempts to parse the input string as JSON using `nlohmann::json` - Creates a new `sensor_msgs::msg::Imu` message and populates: - **Header**: Sets `stamp` to current time via `this->now()` and `frame_id` to `"imu_link"` - **Linear Acceleration**: Extracts from JSON `"accel"` object fields `"x"`, `"y"`, `"z"` (defaults to 0.0 if missing) - **Angular Velocity**: Extracts from JSON `"gyro"` object fields `"x"`, `"y"`, `"z"` (defaults to 0.0 if missing) - Logs the parsed IMU values at INFO level - Calls `publish_imu_data()` to send the message to the ROS topic **Expected JSON Format:** ```json { "accel": {"x": 0.037, "y": -1.164, "z": 9.775}, "gyro": {"x": -0.024, "y": -0.014, "z": -0.001}, "Temp": 41.01 } ``` **Error Handling:** - Catches `nlohmann::json::exception` and logs parsing errors without crashing - Handles missing fields gracefully using `.value()` with default 0.0 ### `void publish_imu_data(const sensor_msgs::msg::Imu::SharedPtr msg)` Publishes an IMU message to the ROS2 topic. **Behavior:** - Dereferences the shared pointer and publishes to `imu_publisher` - Operation is thread-safe (rclcpp publishers support multi-threaded access) ### `void mqtt_configure()` Sets up the MQTT infrastructure for broker communication. **Behavior:** - Creates a persistent `mqtt::async_client` pointing to `SERVER_ADDRESS` if not already created - Creates a persistent MQTT callback handler if not already created - Calls `mqtt_connect()` to establish the connection **Rationale for Persistence:** - Client and callback objects must outlive this function to maintain the connection - Using `shared_ptr` ensures proper lifetime management ### `void mqtt_reader()` Attaches callbacks to the MQTT client to begin receiving messages. **Behavior:** - Sets the callback handler on the async client via `mqtt_client->set_callback(*mqtt_cb)` - Logs that the listener has started - Returns; the async client handles message reception in background threads ### `void mqtt_connect()` Establishes connection to the MQTT broker and subscribes to the sensor topic. **Behavior:** - Creates `mqtt::connect_options` with: - Keep-alive interval: 20 seconds - Clean session: true (no prior session state restored) - Calls `mqtt_client->connect()` and waits for completion - Subscribes to `TOPIC` (default: `"esp32/imu"`) with QoS level 1 - Logs successful connection and subscription **Error Handling:** - Catches `mqtt::exception` and logs errors; does not throw or crash ### `void close_mqtt_conn()` Cleanly disconnects from the MQTT broker and cleans up resources. **Behavior:** - Checks if the client is connected before attempting disconnect - Calls `mqtt_client->disconnect()` and waits for completion - Resets `mqtt_client` and `mqtt_cb` shared pointers to allow object destruction - Logs disconnection and cleanup status **Error Handling:** - Catches `mqtt::exception` and logs errors - Continues cleanup even if errors occur ### `bool open_device(const std::string& device_path, int baud_rate)` Opens and configures a serial port device. **Parameters:** - `device_path`: Path to the serial device (e.g., `"/dev/ttyUSB0"`) - `baud_rate`: Communication speed in bits per second (e.g., `115200`) **Returns:** - `true` if device opened successfully - `false` if an error occurs **Behavior:** - Calls `serial.openDevice()` with the provided path and baud rate - Checks if the returned value is 1 (success) - Logs success or error status ### `bool is_device_open()` Queries the current state of the serial device. **Returns:** - `true` if the device is open - `false` otherwise ### `void close_device()` Closes the serial port and releases resources. **Behavior:** - Calls `serial.closeDevice()` - Ensures the device is no longer accessible for reads/writes ### `void write(const std::string& data)` Writes data to the serial device. **Behavior:** - Logs the write operation - Calls `serial.writeString()` with the data ## MQTT Callback Handler ### `class callback : public virtual mqtt::callback` A nested class that implements the Paho MQTT callback interface. **Method: `message_arrived(mqtt::const_message_ptr msg)`** - Invoked when a message arrives on a subscribed topic - Extracts the payload string via `msg->get_payload_str()` - Calls `parse_data()` to deserialize and publish the IMU message ## Data Flow Architecture ### Serial Data Path ``` Physical IMU Device ↓ Serial Port (e.g., /dev/ttyUSB0 @ 115200 baud) ↓ start_read() Background Thread ↓ serial.readString(buffer, 1000ms timeout) ↓ Accumulate into partial_buffer_ ↓ Split on '\n' and Extract Complete Lines ↓ Sanitize (trim, strip garbage before '{') ↓ Validate JSON Structure (must have '{' and '}') ↓ parse_data(json_line) ↓ JSON Parse → sensor_msgs::msg::Imu ↓ publish_imu_data() → ROS Topic `imu_data` ``` ### MQTT Data Path ``` MQTT Broker (tcp://localhost:1883) ↓ MQTT Async Client (mqtt_client) ↓ Topic Subscription (esp32/imu) ↓ MQTT Callback (message_arrived) ↓ parse_data(payload_string) ↓ JSON Parse → sensor_msgs::msg::Imu ↓ publish_imu_data() → ROS Topic `imu_data` ``` ## Buffer Management & Message Reconstruction The `partial_buffer_` member implements a robust strategy for handling fragmented serial reads: 1. **Accumulation**: Each serial read chunk is appended to `partial_buffer_` 2. **Line Splitting**: Buffer is searched for newline delimiters 3. **Validation**: Each line is checked for JSON structure (presence of `{` and `}`) 4. **Sanitization**: Leading garbage (characters before `{`) is stripped 5. **Incomplete Message Handling**: If a line lacks a closing brace, it's pushed back to the buffer and the loop waits for more data 6. **Parse & Publish**: Complete JSON lines are parsed and published **Why This Matters:** - Serial reads may return fragments of a JSON message (e.g., `",\"gyro\":{...}"`) - Multiple messages can arrive in a single read - Buffering ensures robust handling of all edge cases ## Error Handling & Recovery | Scenario | Behavior | Recovery | |----------|----------|----------| | Serial read timeout | Loop continues, checks `reading_` flag | Automatic retry on next iteration | | Incomplete JSON in buffer | Fragment is retained; waits for next read | No action needed; accumulation handles it | | JSON parse error | Error logged; thread continues listening | Move to next message | | Serial device disconnect | readString returns 0; loop continues | Application can reconnect via `open_device()` | | MQTT broker unreachable | Exception caught and logged | Retry via `mqtt_connect()` | | MQTT message error | Exception caught and logged | Connection remains for next message | ## Thread Safety - **Atomic Flag**: `reading_` uses `std::atomic_bool` for lock-free thread signaling - **Publisher Thread-Safety**: rclcpp publishers are thread-safe; `parse_data()` can safely publish from reader thread - **Resource Cleanup**: `stop_read()` joins the thread before returning, ensuring clean shutdown - **No Shared Mutable State**: Aside from `reading_` and the publisher, thread does not access other class members during execution --- ## Integration with LifecycleManager The `LifecycleManager` orchestrates `HardwareInterface` lifecycle: | Lifecycle Phase | LifecycleManager Call | HardwareInterface Action | |---|---|---| | **Configure** | `hw_interface->open_device()` or `mqtt_configure()` | Open serial port or set up MQTT client | | **Activate** | `hw_interface->start_read()` or `mqtt_reader()` | Spawn reader thread or attach MQTT callbacks | | **Deactivate** | `hw_interface->stop_read()` or `close_mqtt_conn()` | Stop reader thread and join; disconnect MQTT | | **Cleanup** | `hw_interface->close_device()` | Release serial port | --- ## Usage Example ### Direct Instantiation (Advanced) ```cpp // Create an instance (normally managed by LifecycleManager) auto hw = std::make_shared(); // Serial workflow hw->open_device("/dev/ttyUSB0", 115200); hw->start_read(); // ... node spins and publishes IMU data ... hw->stop_read(); hw->close_device(); // MQTT workflow hw->mqtt_configure(); hw->mqtt_reader(); // ... node spins and publishes IMU data ... hw->close_mqtt_conn(); ``` ## Design Patterns 1. **Abstraction Pattern**: Encapsulates serial and MQTT complexity behind a unified interface 2. **Thread Management**: Background reader thread with atomic signaling for clean shutdown 3. **Buffer Accumulation**: Handles fragmented reads and multi-message batches robustly 4. **Dual Backend Strategy**: Runtime selection of communication mode (serial or MQTT) 5. **JSON Deserialization**: Uses industry-standard `nlohmann::json` for robust parsing ## Dependencies - **rclcpp**: ROS2 C++ client library - **sensor_msgs**: ROS2 standard sensor message definitions - **paho-mqtt**: Paho C/C++ MQTT client library - **nlohmann/json**: Header-only JSON parsing library - **serialib**: Custom serial communication wrapper