Каталог товаров

Драйвер Onewire

В этом руководстве объясняется, как подключать и считывать данные с датчика температуры DS18x20. Библиотека onewire также доступна в репозитории pycom-library GitHub.

 

Основное использование

import time

from machine import Pin

from onewire import DS18X20

from onewire import OneWire

 

# DS18B20 data line connected to pin P10

ow = OneWire(Pin(′P10′))

temp = DS18X20(ow)

 

while True:

    print(temp.read_temp_async())

    time.sleep(1)

    temp.start_conversion()

    time.sleep(1)

 

Библиотека

#!/usr/bin/env python3

 

"""

OneWire library for MicroPython

"""

 

import time

import machine

 

class OneWire:

    CMD_SEARCHROM = const(0xf0)

    CMD_READROM = const(0x33)

    CMD_MATCHROM = const(0x55)

    CMD_SKIPROM = const(0xcc)

 

    def __init__(self, pin):

        self.pin = pin

        self.pin.init(pin.OPEN_DRAIN, pin.PULL_UP)

 

    def reset(self):

        """

        Perform the onewire reset function.

        Returns True if a device asserted a presence pulse, False otherwise.

        """

        sleep_us = time.sleep_us

        disable_irq = machine.disable_irq

        enable_irq = machine.enable_irq

        pin = self.pin

 

        pin(0)

        sleep_us(480)

        i = disable_irq()

        pin(1)

        sleep_us(60)

        status = not pin()

        enable_irq(i)

        sleep_us(420)

        return status

 

    def read_bit(self):

        sleep_us = time.sleep_us

        enable_irq = machine.enable_irq

        pin = self.pin

 

        pin(1) # half of the devices don′t match CRC without this line

        i = machine.disable_irq()

        pin(0)

        sleep_us(1)

        pin(1)

        sleep_us(1)

        value = pin()

        enable_irq(i)

        sleep_us(40)

        return value

 

    def read_byte(self):

        value = 0

        for i in range(8):

            value |= self.read_bit() << i

        return value

 

    def read_bytes(self, count):

        buf = bytearray(count)

        for i in range(count):

            buf[i] = self.read_byte()

        return buf

 

    def write_bit(self, value):

        sleep_us = time.sleep_us

        pin = self.pin

 

        i = machine.disable_irq()

        pin(0)

        sleep_us(1)

        pin(value)

        sleep_us(60)

        pin(1)

        sleep_us(1)

        machine.enable_irq(i)

 

    def write_byte(self, value):

        for i in range(8):

            self.write_bit(value & 1)

            value >>= 1

 

    def write_bytes(self, buf):

        for b in buf:

            self.write_byte(b)

 

    def select_rom(self, rom):

        """

        Select a specific device to talk to. Pass in rom as a bytearray (8 bytes).

        """

        self.reset()

        self.write_byte(CMD_MATCHROM)

        self.write_bytes(rom)

 

    def crc8(self, data):

        """

        Compute CRC

        """

        crc = 0

        for i in range(len(data)):

            byte = data[i]

            for b in range(8):

                fb_bit = (crc ^ byte) & 0x01

                if fb_bit == 0x01:

                    crc = crc ^ 0x18

                crc = (crc >> 1) & 0x7f

                if fb_bit == 0x01:

                    crc = crc | 0x80

                byte = byte >> 1

        return crc

 

    def scan(self):

        """

        Return a list of ROMs for all attached devices.

        Each ROM is returned as a bytes object of 8 bytes.

        """

        devices = []

        diff = 65

        rom = False

        for i in range(0xff):

            rom, diff = self._search_rom(rom, diff)

            if rom:

                devices += [rom]

            if diff == 0:

                break

        return devices

 

    def _search_rom(self, l_rom, diff):

        if not self.reset():

            return None, 0

        self.write_byte(CMD_SEARCHROM)

        if not l_rom:

            l_rom = bytearray(8)

        rom = bytearray(8)

        next_diff = 0

        i = 64

        for byte in range(8):

            r_b = 0

            for bit in range(8):

                b = self.read_bit()

                if self.read_bit():

                    if b: # нет девайсов или ошибка

                        return None, 0

                else:

                    if not b: # противоречие (два девайса)

                        if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):

                            b = 1

                            next_diff = i

                self.write_bit(b)

                if b:

                    r_b |= 1 << bit

                i -= 1

            rom[byte] = r_b

        return rom, next_diff

 

class DS18X20(object):

    def __init__(self, onewire):

        self.ow = onewire

        self.roms = [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]

 

    def isbusy(self):

        """

        Checks wether one of the DS18x20 devices on the bus is busy

        performing a temperature conversion

        """

        return not self.ow.read_bit()

 

    def start_conversion(self, rom=None):

        """

        Start the temp conversion on one DS18x20 device.

        Pass the 8-byte bytes object with the ROM of the specific device you want to read.

        If only one DS18x20 device is attached to the bus you may omit the rom parameter.

        """

        rom = rom or self.roms[0]

        ow = self.ow

        ow.reset()

        ow.select_rom(rom)

        ow.write_byte(0x44)  # Convert Temp

 

    def read_temp_async(self, rom=None):

        """

        Read the temperature of one DS18x20 device if the conversion is complete,

        otherwise return None.

        """

        if self.isbusy():

            return None

        rom = rom or self.roms[0]

        ow = self.ow

        ow.reset()

        ow.select_rom(rom)

        ow.write_byte(0xbe)  # Read scratch

        data = ow.read_bytes(9)

        return self.convert_temp(rom[0], data)

 

    def convert_temp(self, rom0, data):

        """

        Convert the raw temperature data into degrees celsius and return as a fixed point with 2 decimal places.

        """

        temp_lsb = data[0]

        temp_msb = data[1]

        if rom0 == 0x10:

            if temp_msb != 0:

                # отрицательное число

                temp_read = temp_lsb >> 1 | 0x80  # обрезать нулевой бит путём сдвига

                temp_read = -((~temp_read + 1) & 0xff) # конвертировать из двух

            else:

                temp_read = temp_lsb >> 1  # обрезать нулевой бит

            count_remain = data[6]

            count_per_c = data[7]

            temp = 100 * temp_read - 25 + (count_per_c - count_remain) // count_per_c

            return temp

        elif rom0 == 0x28:

            return (temp_msb << 8 | temp_lsb) * 100 // 16

        else:

            assert False

 

Многопоточность

MicroPython поддерживает порождение потоков с помощью модуля _thread. В следующем примере демонстрируется использование этого модуля. Поток определяется как функция, которая может принимать любое количество параметров. Ниже запускаются 3 потока, каждый из которых выполняет печать с различным интервалом.

 

import _thread

import time

 

def th_func(delay, id):

    while True:

        time.sleep(delay)

        print(′Running thread %d′ % id)

 

for i in range(3):

    _thread.start_new_thread(th_func, (i + 1, i))

 

Использование замков:

import _thread

 

a_lock = _thread.allocate_lock()

 

with a_lock:

    print("a_lock is locked while this executes")

 

RGB LED

По умолчанию индикатор сердцебиения мигает синим цветом каждые 4 секунды, сигнализируя о том, что система активна. Это может быть отменено через модуль pycom.

 

import pycom

 

pycom.heartbeat(False)

pycom.rgbled(0xff00)   # включает светодиод RGB зеленого цвета

Светодиодный индикатор сердцебиения также используется, чтобы указать, что была обнаружена ошибка.

В следующем фрагменте кода используется светодиод RGB для создания светофора, который работает в течение 10 циклов.

import pycom

import time

 

pycom.heartbeat(False)

for cycles in range(10): # стоп через 10 циклов

    pycom.rgbled(0x007f00) # зеленый

    time.sleep(5)

    pycom.rgbled(0x7f7f00) # желтый

    time.sleep(1.5)

    pycom.rgbled(0x7f0000) # красный

    time.sleep(4)

Вот ожидаемый результат:

RGB LED

Таймеры

Подробную информацию об этом классе можно найти здесь.

Хронометр

Хронометр можно использовать для измерения того, сколько времени прошло в блоке кода. В следующем примере используется простой секундомер.

from machine import Timer

import time

 

chrono = Timer.Chrono()

 

chrono.start()

time.sleep(1.25) # первый круг – 1,25 секунды

lap = chrono.read() # подсчёт времени без остановки

time.sleep(1.5)

chrono.stop()

total = chrono.read()

 

print()

print(" the racer took %f seconds to finish the race" % total)

print("  %f seconds in the first lap" % lap)

print("  %f seconds in the last lap" % (total - lap))

 

Тревога

Тревога может использоваться для получения остановок через определенный интервал. Следующий код выполняет обратный вызов каждую секунду в течение 10 секунд.

 

from machine import Timer

class Clock:

    def __init__(self):

        self.seconds = 0

        self.__alarm = Timer.Alarm(self._seconds_handler, 1, periodic=True)

    def _seconds_handler(self, alarm):

        self.seconds += 1

        print("%02d seconds have passed" % self.seconds)

        if self.seconds == 10:

            alarm.callback(None) # stop counting after 10 seconds

clock = Clock()

 

Нет никаких ограничений на то, что может быть сделано во время остановки. Например, можно даже выполнять сетевые запросы. Однако важно помнить, что остановки обрабатываются последовательно, поэтому рекомендуется делать их короткими. Дополнительную информацию можно найти в разделе «Обработка остановок».

ИК-датчик

Этот код читает триггеры ИК-датчика и отправляет HTTP-запрос для каждого триггера, в данном случае для установки Domoticz. Когда постоянно обнаруживается движение, датчик PIR поддерживает высокий уровень вывода, и в этом случае код будет продолжать отправлять HTTP-запросы каждые 10 секунд (настраивается с помощью переменной hold_time).

Main (main.py)

import time

from network import WLAN

from machine import Pin

from domoticz import Domoticz

wl = WLAN(WLAN.STA)

d = Domoticz("", 8080 ,"")

#config

hold_time_sec = 10

#flags

last_trigger = -10

pir = Pin(′G4′,mode=Pin.IN, pull=Pin.PULL_UP)

# main loop

print("Starting main loop")

while True:

    if pir() == 1:

        if time.time() - last_trigger > hold_time_sec:

            last_trigger = time.time()

            print("Presence detected, sending HTTP request")

            try:

                return_code = d.setVariable(′Presence:LivingRoom′,′1′)

                print("Request result: "+str(return_code))

            except Exception as e:

                print("Request failed")

                print(e)

    else:

        last_trigger = 0

        print("No presence")

 

    time.sleep_ms(500)

 

print("Exited main loop")

 

Boot (boot.py)

Дополнительные скрипты WiFi смотрите в пошаговом руководстве wlan.

import os

import machine

 

uart = machine.UART(0, 115200)

os.dupterm(uart)

 

known_nets = {

    ′NetworkID′:        {′pwd′: ′′, ′wlan_config′:  (′10.0.0.8′, ′255.255.0.0′, ′10.0.0.1′, ′10.0.0.1′)},

}

 

from network import WLAN

wl = WLAN()

 

 

if machine.reset_cause() != machine.SOFT_RESET:

 

    wl.mode(WLAN.STA)

    original_ssid = wl.ssid()

    original_auth = wl.auth()

 

    print("Scanning for known wifi nets")

    available_nets = wl.scan()

    nets = frozenset([e.ssid for e in available_nets])

 

    known_nets_names = frozenset([key for key in known_nets])

    net_to_use = list(nets & known_nets_names)

    try:

        net_to_use = net_to_use[0]

        net_properties = known_nets[net_to_use]

        pwd = net_properties[′pwd′]

        sec = [e.sec for e in available_nets if e.ssid == net_to_use][0]

        if ′wlan_config′ in net_properties:

            wl.ifconfig(config=net_properties[′wlan_config′])

        wl.connect(net_to_use, (sec, pwd), timeout=10000)

        while not wl.isconnected():

            machine.idle() # save power while waiting

        print("Connected to "+net_to_use+" with IP address:" + wl.ifconfig()[0])

 

    except Exception as e:

        print("Failed to connect to any known network, going into AP mode")

        wl.init(mode=WLAN.AP, ssid=original_ssid, auth=original_auth, channel=6, antenna=WLAN.INT_ANT)

 

Domoticz Wrapper (domoticz.py)

import socket

class Domoticz:

 

    def __init__(self, ip, port,  basic):

        self.basic = basic

        self.ip = ip

        self.port = port

 

    def setLight(self, idx, command):

        return self.sendRequest("type=command¶m=switchlight&idx="+idx+"&switchcmd="+command)

 

    def setVariable(self, name, value):

        return self.sendRequest("type=command¶m=updateuservariable&vtype=0&vname="+name+"&vvalue="+value)

 

    def sendRequest(self, path):

        try:

            s = socket.socket()

            s.connect((self.ip,self.port))

            s.send(b"GET /json.htm?"+path+" HTTP/1.1 Host: pycom.io Authorization: Basic "+self.basic+" ")

            status = str(s.readline(), ′utf8′)

            code = status.split(" ")[1]

            s.close()

            return code

 

        except Exception:

            print("HTTP request failed")

            return 0

 

Modbus

Modbus - это протокол обмена сообщениями, который определяет структуру пакета для передачи данных между устройствами в архитектуре ведущий / ведомый. Протокол не зависит от среды передачи и обычно передается по TCP (MODBUS TCP) или по последовательной связи (MODBUS RTU). Modbus предназначен для использования в качестве протокола запроса / ответа и предоставляет услуги, определенные кодами функций. Код функции в запросе сообщает ведомому, какое действие следует выполнить. Коды функций, которые поддерживаются устройствами, перечислены ниже.

Modbus

Для получения дополнительной информации о MODBUS RTU смотрите следующий файл PDF. Информацию о MODBUS TCP можно найти здесь.

Pycom Modbus Library

Библиотеки Python и примеры кода, поддерживающие Modbus TCP и Modbus RTU, доступны в следующем репозитории GitHub. Чтобы использовать эту библиотеку, подключитесь к целевому устройству Pycom через ftp и загрузите папку uModbus в /flash. Описание поддерживаемых кодов приведено ниже.

Чтение состояния катушки

Эта функция запрашивает состояние (ВКЛ/ВЫКЛ) дискретных катушек на удаленном устройстве. Адрес устройства, адрес первой катушки и количество катушек должны быть указаны в запросе. Адрес первой катушки равен 0, и можно прочитать не более 2000 смежных катушек. Пример кода Python показан ниже.

slave_addr=0x0A

starting_address=0x00

coil_quantity=100

 

coil_status = modbus_obj.read_coils(slave_addr, starting_address, coil_quantity)

print(′Coil status: ′ + ′ ′.join(′{:d}′.format(x) for x in coil_status))

 

Чтение состояния дискретных входов

Эта команда используется для чтения состояния (ВКЛ / ВЫКЛ) дискретных входов на удаленном устройстве. Адрес устройства, адрес первого входа и количество считываемых входов должны быть указаны в запросе. Адрес первого входа равен 0, можно прочитать до 2000 непрерывных входов. Пример кода Python показан ниже.

slave_addr=0x0A

starting_address=0x0

input_quantity=100

 

input_status = modbus_obj.read_discrete_inputs(slave_addr, starting_address, input_quantity)

print(′Input status: ′ + ′ ′.join(′{:d}′.format(x) for x in input_status))

Чтение содержимого регистров

Этот код функции используется для считывания содержимого регистров хранения аналоговых выходов. Адрес подчиненного устройства, адрес начального регистра, количество регистров для чтения и знак данных должны быть указаны. Адреса регистров начинаются с 0, и можно прочитать не более 125 непрерывных регистров.

slave_addr=0x0A

starting_address=0x00

register_quantity=100

signed=True

 

register_value = modbus_obj.read_holding_registers(slave_addr, starting_address, register_quantity, signed)

print(′Holding register value: ′ + ′ ′.join(′{:d}′.format(x) for x in register_value))

Чтение входных регистров

Эта команда используется для считывания до 125 непрерывных входных регистров на удаленном устройстве. Адрес устройства, адрес начального регистра, количество входных регистров и знак данных должны быть указаны в запросе. Адрес первых входных регистров равен 0.

slave_addr=0x0A

starting_address=0x00

register_quantity=100

signed=True

 

register_value = modbus_obj.read_input_registers(slave_addr, starting_address, register_quantity, signed)

print(′Input register value: ′ + ′ ′.join(′{:d}′.format(x) for x in register_value))

Запись одной катушки

Этот код функции используется для записи состояния дискретной катушки на удаленном устройстве. Значение 0xFF00 означает, что катушка должна быть установлена в положение ON, а значение 0x0000 означает, что катушка должна быть установлена в положение OFF. Пример кода Python для установки катушки по адресу 0x00 во включенное состояние показан ниже.

slave_addr=0x0A

output_address=0x00

output_value=0xFF00

 

return_flag = modbus_obj.write_single_coil(slave_addr, output_address, output_value)

output_flag = ′Success′ if return_flag else ′Failure′

print(′Writing single coil status: ′ + output_flag)

Запись регистра

Эта команда используется для записи содержимого регистра хранения аналогового выхода на удаленном устройстве. Адрес подчиненного устройства, адрес регистра, значение регистра и подпись данных должны быть указаны в запросе. Как и для всех других команд, адреса регистров начинаются с 0.

slave_addr=0x0A

register_address=0x01

register_value=-32768

signed=True

 

return_flag = modbus_obj.write_single_register(slave_addr, register_address, register_value, signed)

output_flag = ′Success′ if return_flag else ′Failure′

print(′Writing single coil status: ′ + output_flag)

Запись состояний нескольких катушек

Этот код функции используется для установки непрерывной последовательности катушек на удаленном устройстве в положение ВКЛ или ВЫКЛ. Адрес устройства, начальный адрес катушек и массив с состояниями катушек должны быть указаны в запросе.

slave_addr=0x0A

starting_address=0x00

output_values=[1,1,1,0,0,1,1,1,0,0,1,1,1]

 

return_flag = modbus_obj.write_multiple_coils(slave_addr, starting_address, output_values)

output_flag = ′Success′ if return_flag else ′Failure′

print(′Writing multiple coil status: ′ + output_flag)

Запись нескольких регистров

Эта команда используется для записи содержимого непрерывной последовательности аналоговых регистров на удаленном устройстве. Адрес подчиненного устройства, адрес начального регистра, значения регистров и подпись данных должны быть указаны в запросе. Адрес первого регистра равен 0, можно записать максимум 125 значений регистра. Пример кода Python показан ниже.

slave_addr=0x0A

register_address=0x01

register_values=[2, -4, 6, -256, 1024]

signed=True

 

return_flag = modbus_obj.write_multiple_registers(slave_addr, register_address, register_values, signed)

output_flag = ′Success′ if return_flag else ′Failure′

print(′Writing multiple register status: ′ + output_flag)

Информация представленная на данном информационном ресурсе преследует исключительно рекламные цели и не является договором-офертой !
© Все права защищены 2021г https://compacttool.ru