fix(hw_interface): Increase buffersize by 2

This commit is contained in:
2025-10-31 20:10:36 +01:00
parent aff882ebdd
commit 03cf747ad6
10 changed files with 1248 additions and 26 deletions

View File

@@ -0,0 +1,452 @@
# HardwareInterface (`assignments::two::g2_2025_lifecycle_node`)
## Overview
The `HardwareInterface` is a ROS2 node responsible for managing low-level hardware communication with IMU sensors. It abstracts the complexity of dual communication backends (serial and MQTT), provides JSON parsing of sensor data, and publishes standardized `sensor_msgs::msg::Imu` messages to the ROS2 ecosystem. The node handles continuous data acquisition, buffering of fragmented reads, and robust error recovery.
#### Implementation Details
**Public Methods**
- **`start_read()`**: Spawns a background thread that continuously reads JSON payloads from the serial device
- **`stop_read()`**: Signals the reader thread to exit and joins it for clean shutdown
- **`write(const std::string& data)`**: Writes data to the serial device
- **`open_device(const std::string& device_path, int baud_rate)`**: Opens and configures a serial port
- **`is_device_open()`**: Checks if the serial device is currently open
- **`close_device()`**: Closes the serial port and releases resources
- **`mqtt_configure()`**: Initializes MQTT client and broker connection setup
- **`mqtt_reader()`**: Attaches MQTT callbacks to begin receiving messages
- **`mqtt_connect()`**: Establishes connection to the MQTT broker and subscribes to topics
- **`close_mqtt_conn()`**: Disconnects from broker and cleans up MQTT resources
- **`parse_data(const std::string& data)`**: Parses JSON payload into `sensor_msgs::msg::Imu` and publishes
- **`publish_imu_data(const sensor_msgs::msg::Imu::SharedPtr msg)`**: Publishes an IMU message to the ROS topic
**Constructor**
```cpp
HardwareInterface()
```
- Initializes ROS2 node with name `hardware_interface`
- Creates a ROS2 publisher for `sensor_msgs::msg::Imu` on topic `imu_data` with queue size 10
- Logs initialization status
**Member Variables**
- **`serialib serial`**: Encapsulates serial port communication
- **`std::shared_ptr<mqtt::async_client> mqtt_client`**: Persistent MQTT async client for broker communication
- **`std::shared_ptr<mqtt::callback> mqtt_cb`**: MQTT callback handler for message arrival events
- **`std::thread read_thread_`**: Background reader thread for continuous serial data acquisition
- **`std::atomic_bool reading_`**: Thread-safe flag to signal the reader thread to stop
- **`std::string partial_buffer_`**: Accumulates fragmented serial reads until complete messages are available
- **`rclcpp::Publisher<sensor_msgs::msg::Imu>::SharedPtr imu_publisher`**: ROS2 publisher for IMU data
**MQTT Constants**
- **`SERVER_ADDRESS`**: `"tcp://localhost:1883"` — Default MQTT broker address
- **`CLIENT_ID`**: `"cpp_mqtt_client"` — MQTT client identifier
- **`TOPIC`**: `"esp32/imu"` — Default subscription topic for IMU data
---
## Core Functions
### `void start_read()`
Initiates continuous serial data acquisition in a background thread.
**Behavior:**
- Checks if a reader thread is already running; returns if so
- Sets the `reading_` atomic flag to true
- Spawns a thread that:
1. Allocates a 116-byte buffer
2. Enters a loop that runs while `reading_` is true
3. Calls `serial.readString()` with a 1-second timeout
4. Accumulates received bytes into `partial_buffer_`
5. Splits on newline (`\n`) to extract complete lines
6. Trims whitespace and strips leading garbage up to the first `{`
7. Validates that a closing `}` exists; if not, waits for more data
8. Calls `parse_data()` on each complete JSON line
- Returns immediately while the thread continues running
**Error Handling:**
- Invalid JSON lines are logged as errors in `parse_data()` but do not crash the thread
---
### `void stop_read()`
Cleanly terminates the background reader thread.
**Behavior:**
- Returns immediately if not currently reading
- Sets `reading_` atomic flag to false to signal the thread
- Joins the thread to wait for its completion
- Ensures all resources are released before returning
**Thread Safety:**
- Uses atomic flag for lock-free signaling
- Blocks until thread joins, guaranteeing clean shutdown
---
### `void parse_data(const std::string& data)`
Deserializes a JSON string into a ROS2 IMU message and publishes it.
**Behavior:**
- Attempts to parse the input string as JSON using `nlohmann::json`
- Creates a new `sensor_msgs::msg::Imu` message and populates:
- **Header**: Sets `stamp` to current time via `this->now()` and `frame_id` to `"imu_link"`
- **Linear Acceleration**: Extracts from JSON `"accel"` object fields `"x"`, `"y"`, `"z"` (defaults to 0.0 if missing)
- **Angular Velocity**: Extracts from JSON `"gyro"` object fields `"x"`, `"y"`, `"z"` (defaults to 0.0 if missing)
- Logs the parsed IMU values at INFO level
- Calls `publish_imu_data()` to send the message to the ROS topic
**Expected JSON Format:**
```json
{
"accel": {"x": 0.037, "y": -1.164, "z": 9.775},
"gyro": {"x": -0.024, "y": -0.014, "z": -0.001},
"Temp": 41.01
}
```
**Error Handling:**
- Catches `nlohmann::json::exception` and logs parsing errors without crashing
- Handles missing fields gracefully using `.value()` with default 0.0
---
### `void publish_imu_data(const sensor_msgs::msg::Imu::SharedPtr msg)`
Publishes an IMU message to the ROS2 topic.
**Behavior:**
- Dereferences the shared pointer and publishes to `imu_publisher`
- Operation is thread-safe (rclcpp publishers support multi-threaded access)
---
### `void mqtt_configure()`
Sets up the MQTT infrastructure for broker communication.
**Behavior:**
- Creates a persistent `mqtt::async_client` pointing to `SERVER_ADDRESS` if not already created
- Creates a persistent MQTT callback handler if not already created
- Calls `mqtt_connect()` to establish the connection
**Rationale for Persistence:**
- Client and callback objects must outlive this function to maintain the connection
- Using `shared_ptr` ensures proper lifetime management
---
### `void mqtt_reader()`
Attaches callbacks to the MQTT client to begin receiving messages.
**Behavior:**
- Sets the callback handler on the async client via `mqtt_client->set_callback(*mqtt_cb)`
- Logs that the listener has started
- Returns; the async client handles message reception in background threads
---
### `void mqtt_connect()`
Establishes connection to the MQTT broker and subscribes to the sensor topic.
**Behavior:**
- Creates `mqtt::connect_options` with:
- Keep-alive interval: 20 seconds
- Clean session: true (no prior session state restored)
- Calls `mqtt_client->connect()` and waits for completion
- Subscribes to `TOPIC` (default: `"esp32/imu"`) with QoS level 1
- Logs successful connection and subscription
**Error Handling:**
- Catches `mqtt::exception` and logs errors; does not throw or crash
---
### `void close_mqtt_conn()`
Cleanly disconnects from the MQTT broker and cleans up resources.
**Behavior:**
- Checks if the client is connected before attempting disconnect
- Calls `mqtt_client->disconnect()` and waits for completion
- Resets `mqtt_client` and `mqtt_cb` shared pointers to allow object destruction
- Logs disconnection and cleanup status
**Error Handling:**
- Catches `mqtt::exception` and logs errors
- Continues cleanup even if errors occur
---
### `bool open_device(const std::string& device_path, int baud_rate)`
Opens and configures a serial port device.
**Parameters:**
- `device_path`: Path to the serial device (e.g., `"/dev/ttyUSB0"`)
- `baud_rate`: Communication speed in bits per second (e.g., `115200`)
**Returns:**
- `true` if device opened successfully
- `false` if an error occurs
**Behavior:**
- Calls `serial.openDevice()` with the provided path and baud rate
- Checks if the returned value is 1 (success)
- Logs success or error status
---
### `bool is_device_open()`
Queries the current state of the serial device.
**Returns:**
- `true` if the device is open
- `false` otherwise
---
### `void close_device()`
Closes the serial port and releases resources.
**Behavior:**
- Calls `serial.closeDevice()`
- Ensures the device is no longer accessible for reads/writes
---
### `void write(const std::string& data)`
Writes data to the serial device.
**Behavior:**
- Logs the write operation
- Calls `serial.writeString()` with the data
---
## MQTT Callback Handler
### `class callback : public virtual mqtt::callback`
A nested class that implements the Paho MQTT callback interface.
**Method: `message_arrived(mqtt::const_message_ptr msg)`**
- Invoked when a message arrives on a subscribed topic
- Extracts the payload string via `msg->get_payload_str()`
- Calls `parse_data()` to deserialize and publish the IMU message
---
## Data Flow Architecture
### Serial Data Path
```
Physical IMU Device
Serial Port (e.g., /dev/ttyUSB0 @ 115200 baud)
start_read() Background Thread
serial.readString(buffer, 1000ms timeout)
Accumulate into partial_buffer_
Split on '\n' and Extract Complete Lines
Sanitize (trim, strip garbage before '{')
Validate JSON Structure (must have '{' and '}')
parse_data(json_line)
JSON Parse → sensor_msgs::msg::Imu
publish_imu_data() → ROS Topic `imu_data`
```
### MQTT Data Path
```
MQTT Broker (tcp://localhost:1883)
MQTT Async Client (mqtt_client)
Topic Subscription (esp32/imu)
MQTT Callback (message_arrived)
parse_data(payload_string)
JSON Parse → sensor_msgs::msg::Imu
publish_imu_data() → ROS Topic `imu_data`
```
---
## Buffer Management & Message Reconstruction
The `partial_buffer_` member implements a robust strategy for handling fragmented serial reads:
1. **Accumulation**: Each serial read chunk is appended to `partial_buffer_`
2. **Line Splitting**: Buffer is searched for newline delimiters
3. **Validation**: Each line is checked for JSON structure (presence of `{` and `}`)
4. **Sanitization**: Leading garbage (characters before `{`) is stripped
5. **Incomplete Message Handling**: If a line lacks a closing brace, it's pushed back to the buffer and the loop waits for more data
6. **Parse & Publish**: Complete JSON lines are parsed and published
**Why This Matters:**
- Serial reads may return fragments of a JSON message (e.g., `",\"gyro\":{...}"`)
- Multiple messages can arrive in a single read
- Buffering ensures robust handling of all edge cases
---
## Error Handling & Recovery
| Scenario | Behavior | Recovery |
|----------|----------|----------|
| Serial read timeout | Loop continues, checks `reading_` flag | Automatic retry on next iteration |
| Incomplete JSON in buffer | Fragment is retained; waits for next read | No action needed; accumulation handles it |
| JSON parse error | Error logged; thread continues listening | Move to next message |
| Serial device disconnect | readString returns 0; loop continues | Application can reconnect via `open_device()` |
| MQTT broker unreachable | Exception caught and logged | Retry via `mqtt_connect()` |
| MQTT message error | Exception caught and logged | Connection remains for next message |
---
## Thread Safety
- **Atomic Flag**: `reading_` uses `std::atomic_bool` for lock-free thread signaling
- **Publisher Thread-Safety**: rclcpp publishers are thread-safe; `parse_data()` can safely publish from reader thread
- **Resource Cleanup**: `stop_read()` joins the thread before returning, ensuring clean shutdown
- **No Shared Mutable State**: Aside from `reading_` and the publisher, thread does not access other class members during execution
---
## Integration with LifecycleManager
The `LifecycleManager` orchestrates `HardwareInterface` lifecycle:
| Lifecycle Phase | LifecycleManager Call | HardwareInterface Action |
|---|---|---|
| **Configure** | `hw_interface->open_device()` or `mqtt_configure()` | Open serial port or set up MQTT client |
| **Activate** | `hw_interface->start_read()` or `mqtt_reader()` | Spawn reader thread or attach MQTT callbacks |
| **Deactivate** | `hw_interface->stop_read()` or `close_mqtt_conn()` | Stop reader thread and join; disconnect MQTT |
| **Cleanup** | `hw_interface->close_device()` | Release serial port |
---
## Usage Example
### Direct Instantiation (Advanced)
```cpp
// Create an instance (normally managed by LifecycleManager)
auto hw = std::make_shared<HardwareInterface>();
// Serial workflow
hw->open_device("/dev/ttyUSB0", 115200);
hw->start_read();
// ... node spins and publishes IMU data ...
hw->stop_read();
hw->close_device();
// MQTT workflow
hw->mqtt_configure();
hw->mqtt_reader();
// ... node spins and publishes IMU data ...
hw->close_mqtt_conn();
```
### Via LifecycleManager (Recommended)
```bash
# Launch and manage via lifecycle
ros2 run g2_2025_imu_reader_pkg g2_2025_lifecycle_node \
--ros-args -p device_path:=/dev/ttyUSB0 -p baudrate:=115200 -p comm_t:=serial
# Configure and activate
ros2 lifecycle set /lifecycle_manager configure
ros2 lifecycle set /lifecycle_manager activate
# Subscribe to IMU data
ros2 topic echo /imu_data
# Deactivate and cleanup
ros2 lifecycle set /lifecycle_manager deactivate
ros2 lifecycle set /lifecycle_manager shutdown
```
---
## Design Patterns
1. **Abstraction Pattern**: Encapsulates serial and MQTT complexity behind a unified interface
2. **Thread Management**: Background reader thread with atomic signaling for clean shutdown
3. **Buffer Accumulation**: Handles fragmented reads and multi-message batches robustly
4. **Dual Backend Strategy**: Runtime selection of communication mode (serial or MQTT)
5. **JSON Deserialization**: Uses industry-standard `nlohmann::json` for robust parsing
---
## Dependencies
- **rclcpp**: ROS2 C++ client library
- **sensor_msgs**: ROS2 standard sensor message definitions
- **paho-mqtt**: Paho C/C++ MQTT client library
- **nlohmann/json**: Header-only JSON parsing library
- **serialib**: Custom serial communication wrapper
---
## Class Diagram
```
┌─────────────────────────────────────┐
│ HardwareInterface │
│ (rclcpp::Node) │
├─────────────────────────────────────┤
│ Private Members: │
│ - serialib serial │
│ - async_client mqtt_client │
│ - callback mqtt_cb │
│ - thread read_thread_ │
│ - atomic_bool reading_ │
│ - string partial_buffer_ │
│ - Publisher imu_publisher │
├─────────────────────────────────────┤
│ Public Methods: │
│ + start_read() │
│ + stop_read() │
│ + open_device() │
│ + close_device() │
│ + is_device_open() │
│ + write() │
│ + mqtt_configure() │
│ + mqtt_reader() │
│ + mqtt_connect() │
│ + close_mqtt_conn() │
│ + parse_data() │
│ + publish_imu_data() │
└─────────────────────────────────────┘
│ orchestrated by
┌──────────────────┐
│ LifecycleManager │
└──────────────────┘
```