Модули Pycom имеют возможность обновлять прошивку устройства в процессе работы, мы называем это «беспроводным» (OTA) обновлением. Библиотека pycom предоставляет несколько функций для достижения этой цели. Этот пример продемонстрирует, как вы можете использовать эту функцию для обновления устройств. Полный исходный код этого примера можно найти здесь.
Представьте, что вы - компания, занимающаяся интеллектуальными измерениями, и вы хотите выпустить обновление для своего интеллектуального счетчика на основе Pycom. Эти счетчики обычно отправляют данные обратно через LoRa. К сожалению, сообщения нисходящей линии связи LoRa имеют очень ограниченный размер, и для загрузки полного образа прошивки потребуется несколько сотен, если не тысяч. Чтобы обойти это ограничение, ваши устройства могут отправлять свои обычные данные через LoRa, а когда они получают специальную команду через сообщение нисходящей линии связи, устройства подключаются к сети WiFi.
Невозможно попросить клиентов разрешить вашему устройству подключаться к их домашней сети, поэтому вместо этого эта сеть может быть предоставлена транспортным средством. Это транспортное средство будет перемещаться по определенной географической области, в которую устройствам было отправлено специальное сообщение нисходящей линии связи, чтобы инициировать обновление. Устройства будут искать сеть Wi-Fi, передаваемую транспортным средством, и подключаться. Затем устройства подключатся к серверу, работающему в этой сети WiFi. Этот сервер (также показанный в этом примере) будет генерировать файлы манифеста, которые инструктируют устройство о том, что оно должно обновлять и откуда получать данные обновления.
Код доступен здесь.
Этот скрипт запускает HTTP-сервер на порт 8000, который обеспечивает обновление по беспроводной сети (OTA) в формате JSON, а также предоставляет содержимое обновления. Этот скрипт должен выполняться в каталоге, который содержит каждую версию кода конечных устройств, в следующей структуре:
- server directory
|- this_script.py
|- 1.0.0
| |- flash
| | |- lib
| | | |- lib_a.py
| | |- main.py
| | |- boot.py
| |- sd
| |- some_asset.txt
| |- asset_that_will_be_removed.wav
|- 1.0.1
| |- flash
| | |- lib
| | | |- lib_a.py
| | | |- new_lib.py
| | |- main.py
| | |- boot.py
| |- sd
| |- some_asset.txt
|- firmware_1.0.0.bin
|- firmware_1.0.1.bin
Каталог верхнего уровня, содержащий этот скрипт, может содержать одно из двух:
Как только каталог будет настроен, как описано выше, вам просто нужно запустить этот скрипт, используя python3. После запуска скрипт запустит HTTP-сервер на порт 8000 (его можно изменить, изменив переменную PORT). Этот сервер будет обслуживать все файлы в каталоге вместе с одним дополнительным специальным файлом manifest.json. Этот файл не существует в файловой системе, но вместо этого создается по запросу и содержит необходимые изменения для перевода конечного устройства из его текущей версии в последнюю доступную версию. Пример:
http://127.0.0.1:8000/manifest.json?current_ver=1.0.0
В поле current_ver в конце URL-адреса должна быть указана текущая версия прошивки конечного устройства. Сгенерированный манифест будет содержать списки файлов, которые являются новыми, изменены или должны быть удалены вместе с хэш-файлами SHA1. Ниже приведен пример того, как может выглядеть такой манифест:
{
"delete": [
"flash/old_file.py",
"flash/other_old_file.py"
],
"firmware": {
"URL": "http://192.168.1.144:8000/firmware_1.0.1b.bin",
"hash": "ccc6914a457eb4af8855ec02f6909316526bdd08"
},
"new": [
{
"URL": "http://192.168.1.144:8000/1.0.1b/flash/lib/new_lib.py",
"dst_path": "flash/lib/new_lib.py",
"hash": "1095df8213aac2983efd68dba9420c8efc9c7c4a"
}
],
"update": [
{
"URL": "http://192.168.1.144:8000/1.0.1b/flash/changed_file.py",
"dst_path": "flash/changed_file.py",
"hash": "1095df8213aac2983efd68dba9420c8efc9c7c4a"
}
],
"version": "1.0.1b"
}
Манифест содержит следующие поля:
Примечание. Номер версии файлов может не совпадать с прошивкой. Максимально доступный номер версии, превышающий текущую версию клиента, используется как для прошивки, так и для файлов.
Чтобы URL-адреса были правильно отформатированы, вам необходимо отправить заголовок «host» вместе с запросом HTTP get, например:
GET /manifest.json?current_ver=1.0.0 HTTP/1.0 Host: 192.168.1.144:8000
Библиотека MicroPyton для взаимодействия с сервером, описанным выше, доступна здесь.
Эта библиотека разделена на два слоя. Класс OTA верхнего уровня реализует все функциональные возможности высокого уровня, такие как анализ файла JSON, создание резервных копий обновляемых файлов в случае сбоя и т. Д. Слой библиотеки не зависит от выбранного вами транспортного метода. Ниже - это класс WiFiOTA. Этот класс реализует реальный транспортный механизм того, как устройство извлекает файлы и обновляет манифест (через WiFi, как предполагает название класса). Причина такого разделения заключается в том, что функциональность высокого уровня может использоваться повторно независимо от того, какой транспортный механизм вы используете. Это может быть реализовано, например, через Bluetooth, или сервер может быть изменен с HTTP на FTP.
Хотя приведенный выше код является функциональным, он предоставляется только в качестве примера того, как конечный пользователь может реализовать механизм обновления OTA. Это не финальная функция: например, несмотря на то, что она выполняет резервное копирование предыдущих версий файлов, процедура отката не реализована.
Ниже приведен пример реализации методологии, ранее описанной в этом руководстве, для инициирования обновления OTA.
Приведенный ниже пример будет работать только на устройстве Pycom с возможностями LoRa. Если вы хотите протестировать его на устройстве без функциональности LoRa, просто закомментируйте любой код, относящийся к LoRa. Оставьте только инициализацию WiFiOTA, ota.connect () и ota.update ()
from network import LoRa, WLAN
import socket
import time
from OTA import WiFiOTA
from time import sleep
import pycom
import ubinascii
from config import WIFI_SSID, WIFI_PW, SERVER_IP
# Включен зеленый светодиод
pycom.heartbeat(False)
pycom.rgbled(0xff00)
# Настройка OTA
ota = WiFiOTA(WIFI_SSID,
WIFI_PW,
SERVER_IP, # Обновить адрес сервера
8000) # Обновить порт сервера
# Выключить WiFi для экономии энергии
w = WLAN()
w.deinit()
# Инициализаци LoRa в режиме LORAWAN
lora = LoRa(mode=LoRa.LORAWAN, region=LoRa.EU868)
app_eui = ubinascii.unhexlify(′70B3D57ED0008CD6′)
app_key = ubinascii.unhexlify(′B57F36D88691CEC5EE8659320169A61C′)
# подключение к сети с использованием OTAA (Over the Air Activation)
lora.join(activation=LoRa.OTAA, auth=(app_eui, app_key), timeout=0)
# подождём, пока модуль подключится к сети
while not lora.has_joined():
time.sleep(2.5)
print(′Not yet joined...′)
# создать сокет LoRa
s = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
# настроить скорость передачи данных LoRaWAN
s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5)
# блокировка сокета
# ожидаем отправку данных
s.setblocking(True)
while True:
# отправка данных
s.send(bytes([0x04, 0x05, 0x06]))
# сделаем сокет неблокирующим
# (потому что если данные не получены, они будут заблокированы навсегда ...)
s.setblocking(False)
# получить любые полученные данные (если есть
data = s.recv(64)
# OTA триггер
if data == bytes([0x01, 0x02, 0x03]):
print("Performing OTA")
# Выполнить OTA
ota.connect()
ota.update()
sleep(5)
Обновления программного обеспечения OTA могут быть выполнены через FTP-сервер. Загрузите соответствующий файл .tar.gz прошивки, которую вы хотите обновить/понизить отсюда. Распакуйте файл bin приложения, например, (wipy.bin) для WiPy, переименуйте этот файл в appimg.bin. Загрузите файл appimg.bin в: /flash/sys/appimg.bin через FTP-клиент. После успешной передачи файла через FTP-сервер обновление прошивки будет завершено, и файл appimg.bin будет автоматически удален.
Для загрузки просто перезагрузите устройство с помощью кнопки сброса или команды сброса:
import machine
machine.reset()
Чтобы проверить версию вашего программного обеспечения, выполните:
import os
os.uname().release
Между версиями 1.18.2 и 1.20 изменена таблица разделов, раздел OTA_0 перемещен, потому что размер прошивки был увеличен из-за расширенных функциональных возможностей версии 1.20. По этой причине при обновлении с версии 1.18.2 до 1.20 через OTA необходимо выполнить обновление до промежуточной версии под номером 1.18.3. Промежуточная версия гарантирует, что при обновлении OTA до 1.20 таблица разделов будет содержать правильные записи, и новое изображение попадёт в нужное место.
Промежуточная версия 1.18.3 содержит следующие дополнительные шаги, когда запрашивается обновление OTA:
Примечание. Обратите внимание, что в случае сбоя при обновлении загрузчика или таблицы разделов (например, из-за потери питания) устройство не загружается и не восстанавливается, а новый образ должен быть прошит через кабель.
Когда устройство обновляется до промежуточной версии 1.18.3, оно может быть перенесено в раздел «OTA_0» в зависимости от того, откуда загружается текущее активное изображение. В этом случае при следующей загрузке оно автоматически копирует себя из «OTA_0» в раздел «Factory», чтобы подготовиться к возможности получения в будущем образа 1.20. Без этого шага, когда образ 1.20 находится в разделе «Factory» из-за его увеличенного размера, он перезапишет первые сегменты текущего встроенного программного обеспечения (которым является 1.18.3, расположено в разделе «OTA_0»).
Поэтому правильная цепочка обновлений с 1.18 до 1.20 через OTA выглядит так: 1.18.2 -> 1.18.3 -> 1.20
Возможно снижение с 1.20 до 1.18, однако целевой версией должно быть 1.18.3. Образ 1.18.3, способный определить, произошло ли понижение версии, и в этом случае он не будет выполнять шаги, описанные выше: ведет себя как образ 1.18.2r7, только с обновленной таблицей разделов и загрузчиком. Обновление до 1.20 возможно и разрешено.
Примечание. Обновление устройств с помощью зашифрованной флэш-памяти не поддерживается.
Примечание. Понижение через OTA с 1.18.3 до 1.18.2.xx не допускается, так как это может привести к сбою дальнейших операций OTA.
Примечание. Перед выполнением прошивки OTA до 1.20 убедитесь, что сценарии на устройстве обновлены (при необходимости) и совместимы с 1.20 FW, чтобы избежать исключений после обновления устройства до 1.20, что может привести к потере соединения с удаленным устройством.
Вы можете найти образы прошивок для 1.18.3 здесь:
Wipy FiPy GPy LoPy LoPy4 SiPy
Lorawan - это сетевой протокол с низким энергопотреблением (LPWA), который соединяет миллионы устройств по всему миру. Эта технология обеспечивает безопасную связь на больших расстояниях, однако при гораздо более низких скоростях передачи данных, чем традиционные сети, такие как Wi-Fi. В этом уроке мы покажем, как выполнить быстрое, надежное беспроводное обновление (OTA) текстовых файлов (.py, .js, .txt и т.д.), находящихся во флэш-памяти.
Этот проект основан на архитектуре клиент-сервер. Серверы loraserver и mosquitto MQTT используются для передачи данных между приложением обновления и подключенными устройствами. Программа обновления взаимодействует с MQTT loraserver с помощью клиента paho-mqtt. Библиотеки Python и пример кода для средства OTA обновления доступны в следующем репозитории Github.
name="Your region" E.g "EU_863_870" for Europe
rx_window=2
rx2_dr="Choose one with a payload of 200 bytes or more" E.g 5 for Europe
Пример файла конфигурации с описанием параметров можно найти здесь. Для файлов конфигурации lora-app-server, lora-gateway-bridge и lora-geo-server можно оставить значения по умолчанию.
$ sudo docker-compose up
На вкладке «Join» установите флажок «Device supports OTAA», иначе автоматически будет выбран ABP. Если вы предпочитаете ABP, вам необходимо заполнить параметры задержки RX1, скорости передачи данных RX2 и частоты канала RX2.
На вкладке класса C установите флажок «Устройство поддерживает класс C».
Нажмите на только что созданное приложение и на вкладке «Devices» выберите «Create». Выберите имя, описание и EUI устройства. В выпадающем меню выберите профиль устройства, созданный на предыдущих этапах.
На вкладке ключей создайте ключи приложения.
$ python3 -V
$ pip3 install paho-mqtt
Серверные скрипты должны выполняться в каталоге, который содержит каждую версию кода конечных устройств, в следующей структуре:
- server directory
|- server OTA python scripts
|- 1.0.0 (client code)
| |- flash
| | |- lib
| | |- other text / scripts
| | |- main.py
| | |- boot.py
|- 1.0.1 (client code)
| |- flash
| | |- lib
| | |- main.py
| | |- boot.py
| | |- other text / scripts
| | |- new scripts.py
$ python3 updaterService.py
Библиотека MicroPyton для взаимодействия с сервером, описанным выше, доступна здесь.
Пример скрипта main.py показан ниже:
from loranet import LoraNet
from ota import LoraOTA
from network import LoRa
import machine
import utime
def main():
LORA_FREQUENCY = 868100000
LORA_NODE_DR = 5
LORA_REGION = LoRa.EU868
LORA_DEVICE_CLASS = LoRa.CLASS_C
LORA_ACTIVATION = LoRa.OTAA
LORA_CRED = (′my dev_eui′, ′my app_eui′, ′my app_key′)
lora = LoraNet(LORA_FREQUENCY, LORA_NODE_DR, LORA_REGION, LORA_DEVICE_CLASS, LORA_ACTIVATION, LORA_CRED)
lora.connect()
ota = LoraOTA(lora)
while True:
rx = lora.receive(256)
if rx:
print(′Received user message: {}′.format(rx))
utime.sleep(2)
main()
Подробную информацию об этом классе можно найти в RMT.
Периферийное устройство RMT (Remote Control) ESP32 в первую очередь предназначено для отправки и приема инфракрасных сигналов дистанционного управления, которые используют амплитудную модуляцию, но благодаря конструкции его можно использовать для генерации сигналов различных типов.
RMT имеет 7 каналов, 5 из которых доступны и могут быть привязаны к любому выводу GPIO (Примечание: выводы P13 -P18 могут использоваться только в качестве входов).
Следующие примеры создают объект RMT на канале 4, настраивают его для передачи и отправляют некоторые данные в различных формах. Разрешение канала 4 составляет 1000 наносекунд.
В этом первом примере мы определяем сигнал как кортеж двоичных значений, которые определяют форму желаемого сигнала вместе с длительностью бита.
from machine import RMT
# Сопоставляет RMT канал 4 с P21, когда RMT бездействует - будет вывод LOW
rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)
# Создаёт шаблон данных по каждому биту
# длительность * разрешающая способность канала = 10000 * 1000нс = 10мс
data = (1,0,1,1,1,0,1,0,1)
duration = 10000
rmt.pulses_send(duration, data)
В этом примере мы определяем сигнал по кортежу длительности и состояния, в котором начинается сигнал.
from machine import RMT
# Сопоставляет RMT канал 4 с P21, когда RMT бездействует - будет вывод LOW
rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)
# Список длительностей каждого импульса в единицах каналов # разрешающая способность
# длительность = желаемая длительность импульса / разрешение канала
duration = (8000,11000,8000,11000,6000,13000,6000,3000,8000)
# `start_level` определяет, начинается ли сигнал как НИЗКИЙ или ВЫСОКИЙ, затем переключает состояние
rmt.pulses_send(duration, start_level=RMT.HIGH)
Третий пример представляет собой комбинацию двух вышеуказанных стилей определения сигнала. Каждый импульс имеет определенные продолжительность и состояние. Это полезно, если вы не хотите, чтобы сигнал переключал состояние.
from machine import RMT
# Сопоставляет RMT канал 4 с P21, когда RMT бездействует - будет вывод LOW
rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)
# Создаёт шаблон данных по каждому биту
# длительность * разрешающая способность канала = длительность * 1000нс
data = (1,0,1,1,0,1)
duration = (400,200,100,300,200,400)
rmt.pulses_send(duration, data)
В следующем примере создается объект RMT на канале 4 и показана его настройка для передачи с несущей модуляцией.
from machine import RMT
rmt = RMT(channel=4,
gpio="P21",
tx_idle_level=RMT.LOW,
# Несущая = 100 Гц – это 80%, модулирует ВЫСОКИЕ сигналы
tx_carrier = (100, 70, RMT.HIGH))
data = (1,0,1)
duration = 10000
rmt.pulses_send(duration, data)
В следующем примере создается объект RMT на канале 2, настраивается для приема, а затем идёт ожидание непрерывных импульсов без тайм-аута
from machine import RMT
rmt = machine.RMT(channel=2)
rmt.init(gpio="P21", rx_idle_threshold=1000)
data = rmt.pulses_get()
Если tx_idle_level не установлен в противоположность третьему значению в кортеже tx_carrier, несущая волна будет продолжать генерироваться, когда канал RMT находится в режиме ожидания.
В следующем примере создается объект RMT на канале 2, настраивается для приема, затем идёт ожидание максимум 1000 мсек до первого импульса.
from machine import RMT
# Устанавливает RMT канал 2 на P21 и устанавливает максимальную длину действительного импульса на 1000 * разрешение канала = 1000 * 100 нс = 100 мкс
rmt = machine.RMT(channel=2, gpio="P21", rx_idle_threshold=1000)
rmt.init()
# Получает неопределенное количество импульсов, ожидая максимум 500 мсек до первого импульса (в отличие от других мест, где абсолютная продолжительность была основана на RMT разрешении каналов) до импульса длительностью больше, чем rx_idle_threshold.data = rmt.pulses_get(timeout=500)
В следующем примере создается объект RMT на канале 2, настраивается для приема, отфильтровываются импульсы с шириной < 20*100 нано секунд, затем ожидается 100 импульсов
from machine import RMT
rmt = machine.RMT(channel=2, # Resolution = 100ns
gpio="P21",
# Самый длинный импульс = 1000 * 100 нс = 100 мкс rx_idle_threshold=1000,
# Отфильтровывает импульсы короче 20*100 нс = 2 мкс
rx_filter_threshold=20)
# Получает 100 импульсов, импульсы короче 2 мкс или дольше 100 мкс будут проигнорированы. Это означает, что если он получит 80 действительных импульсов, но затем сигнал не меняется в течение 10 часов, а затем происходит еще 20 импульсов, эта функция будет ждать 10 часов
data = rmt.pulses_get(pulses=100)
В следующем примере настраивается сервер, который может принимать 5 подключений параллельно, создавать новый поток для каждого подключенного клиента, получать данные и отправлять обратно, а затем закрывать сокет.
import _thread
import time
# Поток для обработки клиента
def client_thread(clientsocket,n):
# Полечение максимум 12 байт от каждого клиента
r = clientsocket.recv(12)
# Если recv() возвращает 0, соединение закрывается
if len(r) == 0:
clientsocket.close()
return
else:
# Сделаем что-нибудь с полученными данными
print("Received: {}".format(str(r)))
# Отправим данные обратно
clientsocket.send(str(n))
# Закрытие сокета
clientsocket.close()
# Настройка
serversocket = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
serversocket.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)
serversocket.bind(("192.168.0.249", 6543))
# Допускается максимум 5 соединений одновременно
serversocket.listen(5)
# Отправка уникальных данных
c = 0
while True:
# Разрешить соединение
(clientsocket, address) = serversocket.accept()
# Новый поток
_thread.start_new_thread(client_thread, (clientsocket, c))
c = c+1
В следующем примере настраивается клиент, который может подключаться к серверу с 2 неблокирующими сокетами, создавать новый поток для обработки неблокирующих сокетов.
import socket
import _thread
import time
import uerrno
import uselect
def socket_thread(p):
while True:
# Ждём, пока какое-либо действие будет бесконечным
l = p.poll()
# Начало обработки произошедших действий
for t in l:
# Первым элементом возвращаемого кортежа является сам сокет
sock = t[0]
# Второй элемент возвращаемого кортежа – произошедшие события
event = t[1]
# Если произошла какая-либо ошибка или потеря соединения, сокет закрывается if(event & uselect.POLLERR or event & uselect.POLLHUP):
sock.close()
continue
# Если сокет доступен для записи, отправьте некоторые данные
# Сокет становится доступным для записи, если connect () был успешным
if(event & uselect.POLLOUT):
# Если во время отправки возникнет какая-либо ошибка, poll () вернется с событием error – закрытие сокета
try:
sock.send("Data to send")
# Мы отправим только одно сообщение на этот сокет, в будущем - только входящие сообщения
p.modify(sock, uselect.POLLIN | uselect.POLLHUP | uselect.POLLERR)
except:
pass
# Если получены данные, примем их
if(event & uselect.POLLIN):
# Если при получении возникнет ошибка, poll () вернется с error – закрытие сокета
try:
r = sock.recv(1)
# Если recv () возвращается с 0, значит произошло закрытие соединения if len(r) == 0:
sock.close()
continue
else:
# Сделаем что-нибудь с полученными данными
print("Data received: " + str(r))
except:
pass
# Список для хранения наших сокетов
socket_list = []
# Установка первого сокета в неблокирующий режим
s1 = socket.socket()
s1.setblocking(False)
socket_list.append(s1)
# Установка второго сокета в неблокирующий режим
s2 = socket.socket()
s2.setblocking(False)
socket_list.append(s2)
# Создать новый объект опроса
p = uselect.poll()
# Зарегистрируйте сокеты в объекте poll, дождитесь всех видов событий
p.register(s1, uselect.POLLIN | uselect.POLLOUT | uselect.POLLHUP | uselect.POLLERR)
p.register(s2, uselect.POLLIN | uselect.POLLOUT | uselect.POLLHUP | uselect.POLLERR)
# Попробуйте подключиться к серверу с каждым сокетом
for s in socket_list:
try:
s.connect(socket.getaddrinfo("192.168.0.234", 6543)[0][-1])
except OSError as e:
# В случае неблокирующего сокета connect () ожидаемо вызывает EINPROGRESS if e.args[0] != uerrno.EINPROGRESS:
raise
# Запуск потока, который заботится о неблокирующих сокетах через созданный объект
_thread.start_new_thread(socket_thread, (p,))
import socket
import ssl
import _thread
import time
import uerrno
import uselect
# Вспомогательная функция для выполнения рукопожатия
def handshake_helper(sock):
while True:
time.sleep_ms(1)
try:
# Выполнение рукопожатия
sock.do_handshake()
return
except ssl.SSLError as e:
# поднять любые другие ошибки, кроме TIMEOUT
if e.args[0] != ssl.SSL_TIMEOUT:
raise
def socket_thread(p):
while True:
# Ждём бесконечное действие
l = p.poll()
# Начать обработку произошедших действий
for t in l:
# Первым элементом возвращаемого кортежа является сам сокет
sock = t[0]
# Второй элемент возвращаемого кортежа – произошедшие события
event = t[1]
# Если произошла какая-либо ошибка или потеря соединения – сокет закрывается
if(event & uselect.POLLERR or event & uselect.POLLHUP):
p.unregister(sock)
sock.close()
# Если переданы данные – примите их
elif(event & uselect.POLLIN):
# Если при получении возникнет ошибка, poll () вернется с событием error – закрытие сокета
try:
r = sock.recv(1)
# Если recv() вернёт 0 – приозошло закрытие соединения
if len(r) == 0:
p.unregister(sock)
sock.close()
else:
# Сделаем что-нибудь с полученными данными
print("Data received: " + str(r))
except:
pass
# Если сокет доступен для записи, выполнить рукопожатие и отправить некоторые данные
elif(event & uselect.POLLOUT):
try:
# Выполнение рукопожатия SSL
handshake_helper(sock)
except:
# Произошла ошибка, закройте сокет, отмените регистрацию
p.unregister(sock)
sock.close()
# Если во время отправки возникнет ошибка, poll () вернется с событием error – закрытие сокета
try:
sock.send("Data to send")
# Мы отправим только одно сообщение на этот сокет, в будущем - только входящие сообщения
p.modify(sock, uselect.POLLIN | uselect.POLLHUP | uselect.POLLERR)
except:
pass
# Создание сокета
s = socket.socket()
ssl_socket = ssl.wrap_socket(s, keyfile="/flash/cert/my_private_key.pem",
certfile="/flash/cert/my_public_key.pem",
server_side=False,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs="/flash/cert/my_CA_cert.pem"
)
# Установите сокет в неблокирующее состояние
ssl_socket.setblocking(False)
# Создать новый объект
p = uselect.poll()
# Зарегистрируйте сокет в объекте, ожидайте событий write и error
p.register(ssl_socket, uselect.POLLOUT | uselect.POLLHUP | uselect.POLLERR)
# Попробуйте подключиться к серверу
try:
ssl_socket.connect(socket.getaddrinfo("192.168.0.234", 6543)[0][-1])
except OSError as e:
# В случае неблокирующего сокета connect () ожидаемо вызывает EINPROGRESS
if e.args[0] != uerrno.EINPROGRESS:
# Поднимите все другие ошибки
raise
# Запустить поток, контролирующий неблокирующий сокет через созданный объект
_thread.start_new_thread(socket_thread, (p))
Пример использования класса:
from machine import Touch
from machine import Timer
import time
class TouchPad:
def __init__(self, pin, name):
self.touch = Touch(pin)
self.last_press = 0
self.name = name
self.pressed = False
def ispressed(self):
if self.touch.read() < (self.touch.init_value() * 2 / 3):
self.pressed = True
else:
self.pressed = False
return self.pressed
def just_pressed(self):
now = time.ticks_ms()
if now - self.last_press < 500:
return True
else:
return False
def set_press(self):
self.last_press = time.ticks_ms()
class TouchController:
def __init__(self, pads):
self.pads = pads
for pad in self.pads:
pad.touch.init_value(1500)
def check(self, alarm):
for pad in self.pads:
if pad.ispressed() and not pad.just_pressed():
pad.set_press()
if pad.name == ′Right′:
if tleft.just_pressed():
print(′Swipe right′)
else:
print(′{} pressed′.format(pad.name))
elif pad.name == ′Left′:
if tright.just_pressed():
print(′Swipe left′)
else:
г. Москва, Пятницкое ш. д. 18, пав. 566
zakaz@compacttool.ru
8-495-752-55-22
Информация представленная на данном информационном ресурсе преследует исключительно рекламные цели и не является договором-офертой !
© Все права защищены 2015 - 2024г https://compacttool.ru