В этом руководстве объясняется, как подключать и считывать данные с датчика температуры DS18x20. Библиотека onewire также доступна в репозитории pycom-library GitHub.
import time
from machine import Pin
from onewire import DS18X20
from onewire import OneWire
# DS18B20 data line connected to pin P10
ow = OneWire(Pin(′P10′))
temp = DS18X20(ow)
while True:
print(temp.read_temp_async())
time.sleep(1)
temp.start_conversion()
time.sleep(1)
#!/usr/bin/env python3
"""
OneWire library for MicroPython
"""
import time
import machine
class OneWire:
CMD_SEARCHROM = const(0xf0)
CMD_READROM = const(0x33)
CMD_MATCHROM = const(0x55)
CMD_SKIPROM = const(0xcc)
def __init__(self, pin):
self.pin = pin
self.pin.init(pin.OPEN_DRAIN, pin.PULL_UP)
def reset(self):
"""
Perform the onewire reset function.
Returns True if a device asserted a presence pulse, False otherwise.
"""
sleep_us = time.sleep_us
disable_irq = machine.disable_irq
enable_irq = machine.enable_irq
pin = self.pin
pin(0)
sleep_us(480)
i = disable_irq()
pin(1)
sleep_us(60)
status = not pin()
enable_irq(i)
sleep_us(420)
return status
def read_bit(self):
sleep_us = time.sleep_us
enable_irq = machine.enable_irq
pin = self.pin
pin(1) # half of the devices don′t match CRC without this line
i = machine.disable_irq()
pin(0)
sleep_us(1)
pin(1)
sleep_us(1)
value = pin()
enable_irq(i)
sleep_us(40)
return value
def read_byte(self):
value = 0
for i in range(8):
value |= self.read_bit() << i
return value
def read_bytes(self, count):
buf = bytearray(count)
for i in range(count):
buf[i] = self.read_byte()
return buf
def write_bit(self, value):
sleep_us = time.sleep_us
pin = self.pin
i = machine.disable_irq()
pin(0)
sleep_us(1)
pin(value)
sleep_us(60)
pin(1)
sleep_us(1)
machine.enable_irq(i)
def write_byte(self, value):
for i in range(8):
self.write_bit(value & 1)
value >>= 1
def write_bytes(self, buf):
for b in buf:
self.write_byte(b)
def select_rom(self, rom):
"""
Select a specific device to talk to. Pass in rom as a bytearray (8 bytes).
"""
self.reset()
self.write_byte(CMD_MATCHROM)
self.write_bytes(rom)
def crc8(self, data):
"""
Compute CRC
"""
crc = 0
for i in range(len(data)):
byte = data[i]
for b in range(8):
fb_bit = (crc ^ byte) & 0x01
if fb_bit == 0x01:
crc = crc ^ 0x18
crc = (crc >> 1) & 0x7f
if fb_bit == 0x01:
crc = crc | 0x80
byte = byte >> 1
return crc
def scan(self):
"""
Return a list of ROMs for all attached devices.
Each ROM is returned as a bytes object of 8 bytes.
"""
devices = []
diff = 65
rom = False
for i in range(0xff):
rom, diff = self._search_rom(rom, diff)
if rom:
devices += [rom]
if diff == 0:
break
return devices
def _search_rom(self, l_rom, diff):
if not self.reset():
return None, 0
self.write_byte(CMD_SEARCHROM)
if not l_rom:
l_rom = bytearray(8)
rom = bytearray(8)
next_diff = 0
i = 64
for byte in range(8):
r_b = 0
for bit in range(8):
b = self.read_bit()
if self.read_bit():
if b: # нет девайсов или ошибка
return None, 0
else:
if not b: # противоречие (два девайса)
if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
b = 1
next_diff = i
self.write_bit(b)
if b:
r_b |= 1 << bit
i -= 1
rom[byte] = r_b
return rom, next_diff
class DS18X20(object):
def __init__(self, onewire):
self.ow = onewire
self.roms = [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
def isbusy(self):
"""
Checks wether one of the DS18x20 devices on the bus is busy
performing a temperature conversion
"""
return not self.ow.read_bit()
def start_conversion(self, rom=None):
"""
Start the temp conversion on one DS18x20 device.
Pass the 8-byte bytes object with the ROM of the specific device you want to read.
If only one DS18x20 device is attached to the bus you may omit the rom parameter.
"""
rom = rom or self.roms[0]
ow = self.ow
ow.reset()
ow.select_rom(rom)
ow.write_byte(0x44) # Convert Temp
def read_temp_async(self, rom=None):
"""
Read the temperature of one DS18x20 device if the conversion is complete,
otherwise return None.
"""
if self.isbusy():
return None
rom = rom or self.roms[0]
ow = self.ow
ow.reset()
ow.select_rom(rom)
ow.write_byte(0xbe) # Read scratch
data = ow.read_bytes(9)
return self.convert_temp(rom[0], data)
def convert_temp(self, rom0, data):
"""
Convert the raw temperature data into degrees celsius and return as a fixed point with 2 decimal places.
"""
temp_lsb = data[0]
temp_msb = data[1]
if rom0 == 0x10:
if temp_msb != 0:
# отрицательное число
temp_read = temp_lsb >> 1 | 0x80 # обрезать нулевой бит путём сдвига
temp_read = -((~temp_read + 1) & 0xff) # конвертировать из двух
else:
temp_read = temp_lsb >> 1 # обрезать нулевой бит
count_remain = data[6]
count_per_c = data[7]
temp = 100 * temp_read - 25 + (count_per_c - count_remain) // count_per_c
return temp
elif rom0 == 0x28:
return (temp_msb << 8 | temp_lsb) * 100 // 16
else:
assert False
MicroPython поддерживает порождение потоков с помощью модуля _thread. В следующем примере демонстрируется использование этого модуля. Поток определяется как функция, которая может принимать любое количество параметров. Ниже запускаются 3 потока, каждый из которых выполняет печать с различным интервалом.
import _thread
import time
def th_func(delay, id):
while True:
time.sleep(delay)
print(′Running thread %d′ % id)
for i in range(3):
_thread.start_new_thread(th_func, (i + 1, i))
import _thread
a_lock = _thread.allocate_lock()
with a_lock:
print("a_lock is locked while this executes")
По умолчанию индикатор сердцебиения мигает синим цветом каждые 4 секунды, сигнализируя о том, что система активна. Это может быть отменено через модуль pycom.
import pycom
pycom.heartbeat(False)
pycom.rgbled(0xff00) # включает светодиод RGB зеленого цвета
Светодиодный индикатор сердцебиения также используется, чтобы указать, что была обнаружена ошибка.
В следующем фрагменте кода используется светодиод RGB для создания светофора, который работает в течение 10 циклов.
import pycom
import time
pycom.heartbeat(False)
for cycles in range(10): # стоп через 10 циклов
pycom.rgbled(0x007f00) # зеленый
time.sleep(5)
pycom.rgbled(0x7f7f00) # желтый
time.sleep(1.5)
pycom.rgbled(0x7f0000) # красный
time.sleep(4)
Вот ожидаемый результат:
Подробную информацию об этом классе можно найти здесь.
Хронометр можно использовать для измерения того, сколько времени прошло в блоке кода. В следующем примере используется простой секундомер.
from machine import Timer
import time
chrono = Timer.Chrono()
chrono.start()
time.sleep(1.25) # первый круг – 1,25 секунды
lap = chrono.read() # подсчёт времени без остановки
time.sleep(1.5)
chrono.stop()
total = chrono.read()
print()
print(" the racer took %f seconds to finish the race" % total)
print(" %f seconds in the first lap" % lap)
print(" %f seconds in the last lap" % (total - lap))
Тревога может использоваться для получения остановок через определенный интервал. Следующий код выполняет обратный вызов каждую секунду в течение 10 секунд.
from machine import Timer
class Clock:
def __init__(self):
self.seconds = 0
self.__alarm = Timer.Alarm(self._seconds_handler, 1, periodic=True)
def _seconds_handler(self, alarm):
self.seconds += 1
print("%02d seconds have passed" % self.seconds)
if self.seconds == 10:
alarm.callback(None) # stop counting after 10 seconds
clock = Clock()
Нет никаких ограничений на то, что может быть сделано во время остановки. Например, можно даже выполнять сетевые запросы. Однако важно помнить, что остановки обрабатываются последовательно, поэтому рекомендуется делать их короткими. Дополнительную информацию можно найти в разделе «Обработка остановок».
Этот код читает триггеры ИК-датчика и отправляет HTTP-запрос для каждого триггера, в данном случае для установки Domoticz. Когда постоянно обнаруживается движение, датчик PIR поддерживает высокий уровень вывода, и в этом случае код будет продолжать отправлять HTTP-запросы каждые 10 секунд (настраивается с помощью переменной hold_time).
import time
from network import WLAN
from machine import Pin
from domoticz import Domoticz
wl = WLAN(WLAN.STA)
d = Domoticz("", 8080 ,"")
#config
hold_time_sec = 10
#flags
last_trigger = -10
pir = Pin(′G4′,mode=Pin.IN, pull=Pin.PULL_UP)
# main loop
print("Starting main loop")
while True:
if pir() == 1:
if time.time() - last_trigger > hold_time_sec:
last_trigger = time.time()
print("Presence detected, sending HTTP request")
try:
return_code = d.setVariable(′Presence:LivingRoom′,′1′)
print("Request result: "+str(return_code))
except Exception as e:
print("Request failed")
print(e)
else:
last_trigger = 0
print("No presence")
time.sleep_ms(500)
print("Exited main loop")
Дополнительные скрипты WiFi смотрите в пошаговом руководстве wlan.
import os
import machine
uart = machine.UART(0, 115200)
os.dupterm(uart)
known_nets = {
′NetworkID′: {′pwd′: ′′, ′wlan_config′: (′10.0.0.8′, ′255.255.0.0′, ′10.0.0.1′, ′10.0.0.1′)},
}
from network import WLAN
wl = WLAN()
if machine.reset_cause() != machine.SOFT_RESET:
wl.mode(WLAN.STA)
original_ssid = wl.ssid()
original_auth = wl.auth()
print("Scanning for known wifi nets")
available_nets = wl.scan()
nets = frozenset([e.ssid for e in available_nets])
known_nets_names = frozenset([key for key in known_nets])
net_to_use = list(nets & known_nets_names)
try:
net_to_use = net_to_use[0]
net_properties = known_nets[net_to_use]
pwd = net_properties[′pwd′]
sec = [e.sec for e in available_nets if e.ssid == net_to_use][0]
if ′wlan_config′ in net_properties:
wl.ifconfig(config=net_properties[′wlan_config′])
wl.connect(net_to_use, (sec, pwd), timeout=10000)
while not wl.isconnected():
machine.idle() # save power while waiting
print("Connected to "+net_to_use+" with IP address:" + wl.ifconfig()[0])
except Exception as e:
print("Failed to connect to any known network, going into AP mode")
wl.init(mode=WLAN.AP, ssid=original_ssid, auth=original_auth, channel=6, antenna=WLAN.INT_ANT)
import socket
class Domoticz:
def __init__(self, ip, port, basic):
self.basic = basic
self.ip = ip
self.port = port
def setLight(self, idx, command):
return self.sendRequest("type=command¶m=switchlight&idx="+idx+"&switchcmd="+command)
def setVariable(self, name, value):
return self.sendRequest("type=command¶m=updateuservariable&vtype=0&vname="+name+"&vvalue="+value)
def sendRequest(self, path):
try:
s = socket.socket()
s.connect((self.ip,self.port))
s.send(b"GET /json.htm?"+path+" HTTP/1.1 Host: pycom.io Authorization: Basic "+self.basic+" ")
status = str(s.readline(), ′utf8′)
code = status.split(" ")[1]
s.close()
return code
except Exception:
print("HTTP request failed")
return 0
Modbus - это протокол обмена сообщениями, который определяет структуру пакета для передачи данных между устройствами в архитектуре ведущий / ведомый. Протокол не зависит от среды передачи и обычно передается по TCP (MODBUS TCP) или по последовательной связи (MODBUS RTU). Modbus предназначен для использования в качестве протокола запроса / ответа и предоставляет услуги, определенные кодами функций. Код функции в запросе сообщает ведомому, какое действие следует выполнить. Коды функций, которые поддерживаются устройствами, перечислены ниже.
Для получения дополнительной информации о MODBUS RTU смотрите следующий файл PDF. Информацию о MODBUS TCP можно найти здесь.
Библиотеки Python и примеры кода, поддерживающие Modbus TCP и Modbus RTU, доступны в следующем репозитории GitHub. Чтобы использовать эту библиотеку, подключитесь к целевому устройству Pycom через ftp и загрузите папку uModbus в /flash. Описание поддерживаемых кодов приведено ниже.
Эта функция запрашивает состояние (ВКЛ/ВЫКЛ) дискретных катушек на удаленном устройстве. Адрес устройства, адрес первой катушки и количество катушек должны быть указаны в запросе. Адрес первой катушки равен 0, и можно прочитать не более 2000 смежных катушек. Пример кода Python показан ниже.
slave_addr=0x0A
starting_address=0x00
coil_quantity=100
coil_status = modbus_obj.read_coils(slave_addr, starting_address, coil_quantity)
print(′Coil status: ′ + ′ ′.join(′{:d}′.format(x) for x in coil_status))
Эта команда используется для чтения состояния (ВКЛ / ВЫКЛ) дискретных входов на удаленном устройстве. Адрес устройства, адрес первого входа и количество считываемых входов должны быть указаны в запросе. Адрес первого входа равен 0, можно прочитать до 2000 непрерывных входов. Пример кода Python показан ниже.
slave_addr=0x0A
starting_address=0x0
input_quantity=100
input_status = modbus_obj.read_discrete_inputs(slave_addr, starting_address, input_quantity)
print(′Input status: ′ + ′ ′.join(′{:d}′.format(x) for x in input_status))
Этот код функции используется для считывания содержимого регистров хранения аналоговых выходов. Адрес подчиненного устройства, адрес начального регистра, количество регистров для чтения и знак данных должны быть указаны. Адреса регистров начинаются с 0, и можно прочитать не более 125 непрерывных регистров.
slave_addr=0x0A
starting_address=0x00
register_quantity=100
signed=True
register_value = modbus_obj.read_holding_registers(slave_addr, starting_address, register_quantity, signed)
print(′Holding register value: ′ + ′ ′.join(′{:d}′.format(x) for x in register_value))
Эта команда используется для считывания до 125 непрерывных входных регистров на удаленном устройстве. Адрес устройства, адрес начального регистра, количество входных регистров и знак данных должны быть указаны в запросе. Адрес первых входных регистров равен 0.
slave_addr=0x0A
starting_address=0x00
register_quantity=100
signed=True
register_value = modbus_obj.read_input_registers(slave_addr, starting_address, register_quantity, signed)
print(′Input register value: ′ + ′ ′.join(′{:d}′.format(x) for x in register_value))
Этот код функции используется для записи состояния дискретной катушки на удаленном устройстве. Значение 0xFF00 означает, что катушка должна быть установлена в положение ON, а значение 0x0000 означает, что катушка должна быть установлена в положение OFF. Пример кода Python для установки катушки по адресу 0x00 во включенное состояние показан ниже.
slave_addr=0x0A
output_address=0x00
output_value=0xFF00
return_flag = modbus_obj.write_single_coil(slave_addr, output_address, output_value)
output_flag = ′Success′ if return_flag else ′Failure′
print(′Writing single coil status: ′ + output_flag)
Эта команда используется для записи содержимого регистра хранения аналогового выхода на удаленном устройстве. Адрес подчиненного устройства, адрес регистра, значение регистра и подпись данных должны быть указаны в запросе. Как и для всех других команд, адреса регистров начинаются с 0.
slave_addr=0x0A
register_address=0x01
register_value=-32768
signed=True
return_flag = modbus_obj.write_single_register(slave_addr, register_address, register_value, signed)
output_flag = ′Success′ if return_flag else ′Failure′
print(′Writing single coil status: ′ + output_flag)
Этот код функции используется для установки непрерывной последовательности катушек на удаленном устройстве в положение ВКЛ или ВЫКЛ. Адрес устройства, начальный адрес катушек и массив с состояниями катушек должны быть указаны в запросе.
slave_addr=0x0A
starting_address=0x00
output_values=[1,1,1,0,0,1,1,1,0,0,1,1,1]
return_flag = modbus_obj.write_multiple_coils(slave_addr, starting_address, output_values)
output_flag = ′Success′ if return_flag else ′Failure′
print(′Writing multiple coil status: ′ + output_flag)
Эта команда используется для записи содержимого непрерывной последовательности аналоговых регистров на удаленном устройстве. Адрес подчиненного устройства, адрес начального регистра, значения регистров и подпись данных должны быть указаны в запросе. Адрес первого регистра равен 0, можно записать максимум 125 значений регистра. Пример кода Python показан ниже.
slave_addr=0x0A
register_address=0x01
register_values=[2, -4, 6, -256, 1024]
signed=True
return_flag = modbus_obj.write_multiple_registers(slave_addr, register_address, register_values, signed)
output_flag = ′Success′ if return_flag else ′Failure′
print(′Writing multiple register status: ′ + output_flag)
г. Москва, Пятницкое ш. д. 18, пав. 566
zakaz@compacttool.ru
8-495-752-55-22
Информация представленная на данном информационном ресурсе преследует исключительно рекламные цели и не является договором-офертой !
© Все права защищены 2015 - 2024г https://compacttool.ru