project/main.py

41 lines
994 B
Python

from fastapi import FastAPI
from pymodbus.client import ModbusSerialClient
import threading
import time
serial_port = "COM5"
baud_rate = 9600
modbus_client = ModbusSerialClient(port=serial_port, baudrate=baud_rate, )
modbus_data = "0"
def poll_modbus():
global modbus_data
modbus_client.connect()
while True:
try:
response = modbus_client.read_discrete_inputs(0, 1, unit=1)
print(response)
if not response.isError():
modbus_data = response.registers[0]
print(modbus_data)
except:
pass
threading.Thread(target=poll_modbus, daemon=True).start()
app = FastAPI()
@app.get("/modbus_data")
async def get_modbus_data():
return {"modbus_data": modbus_data}
@app.get("/")
async def root():
return {"message": "Welcome to FastAPI with Modbus"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=12345)