38 lines
1000 B
Python
38 lines
1000 B
Python
from fastapi import FastAPI
|
|
from pymodbus.client import ModbusSerialClient as ModbusClient
|
|
import uvicorn
|
|
import threading
|
|
import time
|
|
|
|
app = FastAPI()
|
|
|
|
# Create a Modbus client instance
|
|
client = ModbusClient(method='rtu', port='COM4', baudrate=9600, timeout=1)
|
|
client.connect()
|
|
|
|
# Shared variable to store the Modbus register value
|
|
line = '0'
|
|
|
|
def read_modbus_value():
|
|
global line
|
|
while True:
|
|
# Read holding register (adjust the register address as needed)
|
|
result = client.read_holding_registers(address=1, count=1, unit=1)
|
|
if not result.isError():
|
|
line = str(result.registers[0])
|
|
time.sleep(1) # Poll every 1 second
|
|
|
|
# Start the Modbus reading in a separate thread
|
|
threading.Thread(target=read_modbus_value, daemon=True).start()
|
|
|
|
@app.get("/stat")
|
|
async def get_stat():
|
|
return {'state': line}
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {'message': 'hello world'}
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=12346)
|