from fastapi import FastAPI from pymodbus.client import ModbusSerialClient import time import threading from queue import Queue serial_port = "COM4" baud_rate = 9600 modbus_client = ModbusSerialClient(method='rtu', port=serial_port, baudrate=baud_rate) modbus_queue = Queue() def poll_modbus(): while True: if modbus_client.connect(): response = modbus_client.read_holding_registers(address=0, count=1, unit=1) if not response.isError(): modbus_queue.put(response.registers[0]) modbus_client.close() time.sleep(1) threading.Thread(target=poll_modbus, daemon=True).start() app = FastAPI() @app.get("/modbus_data") async def get_modbus_data(): if not modbus_queue.empty(): modbus_value = modbus_queue.get() return {"modbus_data": modbus_value} else: return {"error": "No data received"} @app.get("/") async def root(): return {"message": "Welcome to Modbus with FastAPI"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=12345)