feat(tests): Add integration tests

This commit is contained in:
2025-10-08 17:52:21 +02:00
parent 5e1df5367c
commit e42856ae4e
2 changed files with 200 additions and 0 deletions

View File

@@ -165,6 +165,12 @@ if(BUILD_TESTING)
target_link_libraries(${PROJECT_NAME}_test_final_grade_determinator
pqxx pq tomlplusplus::tomlplusplus
)
# Add Python integration tests
find_package(ament_cmake_pytest REQUIRED)
ament_add_pytest_test(${PROJECT_NAME}_integration_test test/test_integration_system.py
TIMEOUT 60
)
endif()
ament_package()

View File

@@ -0,0 +1,194 @@
import time
import rclpy
from rclpy.node import Node
from rclpy.executors import SingleThreadedExecutor
from g2_2025_interfaces.msg import Exam, Student
from g2_2025_interfaces.srv import Exams
class IntegrationTestNode(Node):
"""Test node for integration testing of the grade calculator"""
def __init__(self):
super().__init__('integration_test_node')
self.received_exams = []
self.received_students = []
self.student_publisher = self.create_publisher(
Student,
'student_course_management',
10
)
self.exam_subscriber = self.create_subscription(
Exam,
'exam_results',
self.exam_callback,
10
)
self.grade_calculator_client = self.create_client(
Exams,
'grade_calculator_service'
)
def exam_callback(self, msg):
"""Callback for received exam messages"""
self.get_logger().info(f'Received exam: {msg.student_name} - {msg.course_name} - {msg.result}')
self.received_exams.append(msg)
def publish_student_enrollment(self, student_name, course_name):
"""Publish a student enrollment message"""
msg = Student()
msg.student_name = student_name
msg.course_name = course_name
msg.timestamp = self.get_clock().now().to_msg()
self.get_logger().info(f'Publishing student enrollment: {student_name} - {course_name}')
self.student_publisher.publish(msg)
def call_grade_calculator_service(self, student_name, grades):
"""Call the grade calculator service"""
if not self.grade_calculator_client.wait_for_service(timeout_sec=5.0):
self.get_logger().warn('Grade calculator service not available')
return None
request = Exams.Request()
request.student_name = student_name
request.grades = grades
future = self.grade_calculator_client.call_async(request)
return future
def test_message_publishing():
"""Test basic message publishing and receiving"""
rclpy.init()
try:
node = IntegrationTestNode()
executor = SingleThreadedExecutor()
executor.add_node(node)
# Allow some time for setup
time.sleep(1.0)
# Test publishing student enrollment
node.publish_student_enrollment("TestStudent", "TestCourse")
# Spin for a short time to process messages
start_time = time.time()
while time.time() - start_time < 3.0:
executor.spin_once(timeout_sec=0.1)
# Test passes if no exceptions occurred
assert True, "Message publishing test completed"
finally:
node.destroy_node()
rclpy.shutdown()
def test_grade_calculator_service():
"""Test the grade calculator service"""
rclpy.init()
try:
node = IntegrationTestNode()
executor = SingleThreadedExecutor()
executor.add_node(node)
# Test grade calculation
future = node.call_grade_calculator_service("TestStudent", [80, 90, 70])
if future is not None:
# Spin until service response received or timeout
start_time = time.time()
while not future.done() and time.time() - start_time < 10.0:
executor.spin_once(timeout_sec=0.1)
if future.done():
result = future.result()
# Verify the result is reasonable
assert 10 <= result.final_grade <= 100, f"Grade {result.final_grade} is out of valid range"
assert result.final_grade == 80, f"Expected grade 80, got {result.final_grade}"
else:
# Service not available, test passes with warning
print("Warning: Grade calculator service not available during test")
assert True, "Grade calculator service test completed"
finally:
node.destroy_node()
rclpy.shutdown()
def test_system_integration():
"""Test integration between multiple components"""
rclpy.init()
try:
node = IntegrationTestNode()
executor = SingleThreadedExecutor()
executor.add_node(node)
# Clear any existing messages
node.received_exams.clear()
# Publish student enrollment
node.publish_student_enrollment("IntegrationStudent", "IntegrationCourse")
# Spin and wait for potential exam results
start_time = time.time()
while time.time() - start_time < 5.0:
executor.spin_once(timeout_sec=0.1)
assert True, "System integration test completed"
for exam in node.received_exams:
assert 10 <= exam.result <= 100, f"Exam grade {exam.result} is out of valid range"
assert len(exam.student_name) > 0, "Student name should not be empty"
assert len(exam.course_name) > 0, "Course name should not be empty"
finally:
node.destroy_node()
rclpy.shutdown()
def test_multiple_enrollments():
"""Test handling multiple student enrollments"""
rclpy.init()
try:
node = IntegrationTestNode()
executor = SingleThreadedExecutor()
executor.add_node(node)
test_students = [
("Tilmann", "Differentieren"),
("Vincent", "Integreren"),
("Wessel", "Kompjuteren")
]
for student, course in test_students:
node.publish_student_enrollment(student, course)
time.sleep(0.1)
# Spin to process messages
start_time = time.time()
while time.time() - start_time < 3.0:
executor.spin_once(timeout_sec=0.1)
assert True, "Multiple enrollments test completed"
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
test_message_publishing()
test_grade_calculator_service()
test_system_integration()
test_multiple_enrollments()