Каталог товаров

Обновление OTA

Обзор

Модули Pycom имеют возможность обновлять прошивку устройства в процессе работы, мы называем это «беспроводным» (OTA) обновлением. Библиотека pycom предоставляет несколько функций для достижения этой цели. Этот пример продемонстрирует, как вы можете использовать эту функцию для обновления устройств. Полный исходный код этого примера можно найти здесь.

Метод А

Представьте, что вы - компания, занимающаяся интеллектуальными измерениями, и вы хотите выпустить обновление для своего интеллектуального счетчика на основе Pycom. Эти счетчики обычно отправляют данные обратно через LoRa. К сожалению, сообщения нисходящей линии связи LoRa имеют очень ограниченный размер, и для загрузки полного образа прошивки потребуется несколько сотен, если не тысяч. Чтобы обойти это ограничение, ваши устройства могут отправлять свои обычные данные через LoRa, а когда они получают специальную команду через сообщение нисходящей линии связи, устройства подключаются к сети WiFi.

Невозможно попросить клиентов разрешить вашему устройству подключаться к их домашней сети, поэтому вместо этого эта сеть может быть предоставлена транспортным средством. Это транспортное средство будет перемещаться по определенной географической области, в которую устройствам было отправлено специальное сообщение нисходящей линии связи, чтобы инициировать обновление. Устройства будут искать сеть Wi-Fi, передаваемую транспортным средством, и подключаться. Затем устройства подключатся к серверу, работающему в этой сети WiFi. Этот сервер (также показанный в этом примере) будет генерировать файлы манифеста, которые инструктируют устройство о том, что оно должно обновлять и откуда получать данные обновления.

Сервер

Код доступен здесь.

Этот скрипт запускает HTTP-сервер на порт 8000, который обеспечивает обновление по беспроводной сети (OTA) в формате JSON, а также предоставляет содержимое обновления. Этот скрипт должен выполняться в каталоге, который содержит каждую версию кода конечных устройств, в следующей структуре:

  - server directory

    |- this_script.py

    |- 1.0.0

    |  |- flash

    |  |   |- lib

    |  |   |  |- lib_a.py

    |  |   |- main.py

    |  |   |- boot.py

    |  |- sd

    |     |- some_asset.txt

    |     |- asset_that_will_be_removed.wav

    |- 1.0.1

    |  |- flash

    |  |   |- lib

    |  |   |  |- lib_a.py

    |  |   |  |- new_lib.py

    |  |   |- main.py

    |  |   |- boot.py

    |  |- sd

    |     |- some_asset.txt

    |- firmware_1.0.0.bin

    |- firmware_1.0.1.bin

 

Каталог верхнего уровня, содержащий этот скрипт, может содержать одно из двух:

  • Обновление директории: названия должны быть названы с совместимым номером версии в соответствии со схемой версионирования Python LooseVersion. (Http://epydoc.sourceforge.net/stdlib/distutils.version.LooseVersion-class.html). Они должны содержать всю файловую систему конечного устройства для соответствующего номера версии.
  • Прошивка: файлы должны быть названы в формате firmare_VERSION.bin, где VERSION - это номер версии, совместимый со схемой версионирования Python LooseVersion (http://epydoc.sourceforge.net/stdlib/distutils.version.LooseVersion-class.html). , этот файл должен быть в формате appimg.bin.

Как пользоваться

Как только каталог будет настроен, как описано выше, вам просто нужно запустить этот скрипт, используя python3. После запуска скрипт запустит HTTP-сервер на порт 8000 (его можно изменить, изменив переменную PORT). Этот сервер будет обслуживать все файлы в каталоге вместе с одним дополнительным специальным файлом manifest.json. Этот файл не существует в файловой системе, но вместо этого создается по запросу и содержит необходимые изменения для перевода конечного устройства из его текущей версии в последнюю доступную версию. Пример:

http://127.0.0.1:8000/manifest.json?current_ver=1.0.0

В поле current_ver в конце URL-адреса должна быть указана текущая версия прошивки конечного устройства. Сгенерированный манифест будет содержать списки файлов, которые являются новыми, изменены или должны быть удалены вместе с хэш-файлами SHA1. Ниже приведен пример того, как может выглядеть такой манифест:

 

{

   "delete": [

      "flash/old_file.py",

      "flash/other_old_file.py"

   ],

   "firmware": {

       "URL": "http://192.168.1.144:8000/firmware_1.0.1b.bin",

       "hash": "ccc6914a457eb4af8855ec02f6909316526bdd08"

   },

   "new": [

       {

           "URL": "http://192.168.1.144:8000/1.0.1b/flash/lib/new_lib.py",

           "dst_path": "flash/lib/new_lib.py",

           "hash": "1095df8213aac2983efd68dba9420c8efc9c7c4a"

       }

   ],

   "update": [

       {

           "URL": "http://192.168.1.144:8000/1.0.1b/flash/changed_file.py",

           "dst_path": "flash/changed_file.py",

           "hash": "1095df8213aac2983efd68dba9420c8efc9c7c4a"

       }

   ],

   "version": "1.0.1b"

}

 

Манифест содержит следующие поля:

  • delete: список путей к файлам, которые больше не нужны
  • firmware: URL и хэш SHA1 образа прошивки
  • new: URL, путь на конечном устройстве и SHA1 всех новых файлов
  • update: URL, путь к конечному устройству и хэш SHA1 всех файлов, которые существовали раньше, но были изменены.
  • version: номер версии, до которой этот манифест будет обновлять клиента
  • previous_version: версия, на которой в данный момент находится клиент перед применением обновления

Примечание. Номер версии файлов может не совпадать с прошивкой. Максимально доступный номер версии, превышающий текущую версию клиента, используется как для прошивки, так и для файлов.

Чтобы URL-адреса были правильно отформатированы, вам необходимо отправить заголовок «host» вместе с запросом HTTP get, например:

GET /manifest.json?current_ver=1.0.0 HTTP/1.0 Host: 192.168.1.144:8000

Библиотека клиента

Библиотека MicroPyton для взаимодействия с сервером, описанным выше, доступна здесь.

Эта библиотека разделена на два слоя. Класс OTA верхнего уровня реализует все функциональные возможности высокого уровня, такие как анализ файла JSON, создание резервных копий обновляемых файлов в случае сбоя и т. Д. Слой библиотеки не зависит от выбранного вами транспортного метода. Ниже - это класс WiFiOTA. Этот класс реализует реальный транспортный механизм того, как устройство извлекает файлы и обновляет манифест (через WiFi, как предполагает название класса). Причина такого разделения заключается в том, что функциональность высокого уровня может использоваться повторно независимо от того, какой транспортный механизм вы используете. Это может быть реализовано, например, через Bluetooth, или сервер может быть изменен с HTTP на FTP.

Хотя приведенный выше код является функциональным, он предоставляется только в качестве примера того, как конечный пользователь может реализовать механизм обновления OTA. Это не финальная функция: например, несмотря на то, что она выполняет резервное копирование предыдущих версий файлов, процедура отката не реализована.

Пример

Ниже приведен пример реализации методологии, ранее описанной в этом руководстве, для инициирования обновления OTA.

Приведенный ниже пример будет работать только на устройстве Pycom с возможностями LoRa. Если вы хотите протестировать его на устройстве без функциональности LoRa, просто закомментируйте любой код, относящийся к LoRa. Оставьте только инициализацию WiFiOTA,  ota.connect () и ota.update ()

from network import LoRa, WLAN

import socket

import time

from OTA import WiFiOTA

from time import sleep

import pycom

import ubinascii

 

from config import WIFI_SSID, WIFI_PW, SERVER_IP

 

# Включен зеленый светодиод

pycom.heartbeat(False)

pycom.rgbled(0xff00)

 

# Настройка OTA

ota = WiFiOTA(WIFI_SSID,

              WIFI_PW,

              SERVER_IP,  # Обновить адрес сервера

              8000)  # Обновить порт сервера

 

# Выключить WiFi для экономии энергии

w = WLAN()

w.deinit()

 

# Инициализаци LoRa в режиме LORAWAN

lora = LoRa(mode=LoRa.LORAWAN, region=LoRa.EU868)

 

app_eui = ubinascii.unhexlify(′70B3D57ED0008CD6′)

app_key = ubinascii.unhexlify(′B57F36D88691CEC5EE8659320169A61C′)

 

# подключение к сети с использованием OTAA (Over the Air Activation)

lora.join(activation=LoRa.OTAA, auth=(app_eui, app_key), timeout=0)

 

# подождём, пока модуль подключится к сети

while not lora.has_joined():

    time.sleep(2.5)

    print(′Not yet joined...′)

 

# создать сокет LoRa 

s = socket.socket(socket.AF_LORA, socket.SOCK_RAW)

 

# настроить скорость передачи данных LoRaWAN 

s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5)

 

# блокировка сокета

# ожидаем отправку данных

s.setblocking(True)

 

while True:

    # отправка данных

    s.send(bytes([0x04, 0x05, 0x06]))

 

    # сделаем сокет неблокирующим

    # (потому что если данные не получены, они будут заблокированы навсегда ...)

    s.setblocking(False)

 

    # получить любые полученные данные (если есть

    data = s.recv(64)

 

    # OTA триггер

    if data == bytes([0x01, 0x02, 0x03]):

        print("Performing OTA")

        # Выполнить OTA

        ota.connect()

        ota.update()

 

    sleep(5)

 

Метод Б

Обновления программного обеспечения OTA могут быть выполнены через FTP-сервер. Загрузите соответствующий файл .tar.gz прошивки, которую вы хотите обновить/понизить отсюда. Распакуйте файл bin приложения, например, (wipy.bin) для WiPy, переименуйте этот файл в appimg.bin. Загрузите файл appimg.bin в: /flash/sys/appimg.bin через FTP-клиент. После успешной передачи файла через FTP-сервер обновление прошивки будет завершено, и файл appimg.bin будет автоматически удален.

Для загрузки просто перезагрузите устройство с помощью кнопки сброса или команды сброса:

import machine

machine.reset()

Чтобы проверить версию вашего программного обеспечения, выполните:

import os

os.uname().release

OTA обновление с версии 1.18 до 1.20

Между версиями 1.18.2 и 1.20 изменена таблица разделов, раздел OTA_0 перемещен, потому что размер прошивки был увеличен из-за расширенных функциональных возможностей версии 1.20. По этой причине при обновлении с версии 1.18.2 до 1.20 через OTA необходимо выполнить обновление до промежуточной версии под номером 1.18.3. Промежуточная версия гарантирует, что при обновлении OTA до 1.20 таблица разделов будет содержать правильные записи, и новое изображение попадёт в нужное место.

Промежуточная версия 1.18.3 содержит следующие дополнительные шаги, когда запрашивается обновление OTA:

  • Обновляет загрузчик
  • Перемещает содержимое раздела «ota data» на новое место
  • Обновляет таблицу разделов

Примечание. Обратите внимание, что в случае сбоя при обновлении загрузчика или таблицы разделов (например, из-за потери питания) устройство не загружается и не восстанавливается, а новый образ должен быть прошит через кабель.

Когда устройство обновляется до промежуточной версии 1.18.3, оно может быть перенесено в раздел «OTA_0» в зависимости от того, откуда загружается текущее активное изображение. В этом случае при следующей загрузке оно автоматически копирует себя из «OTA_0» в раздел «Factory», чтобы подготовиться к возможности получения в будущем образа 1.20. Без этого шага, когда образ 1.20 находится в разделе «Factory» из-за его увеличенного размера, он перезапишет первые сегменты текущего встроенного программного обеспечения (которым является 1.18.3, расположено в разделе «OTA_0»).

Поэтому правильная цепочка обновлений с 1.18 до 1.20 через OTA выглядит так: 1.18.2 -> 1.18.3 -> 1.20

Возможно снижение с 1.20 до 1.18, однако целевой версией должно быть 1.18.3. Образ 1.18.3, способный определить, произошло ли понижение версии, и в этом случае он не будет выполнять шаги, описанные выше: ведет себя как образ 1.18.2r7, только с обновленной таблицей разделов и загрузчиком. Обновление до 1.20 возможно и разрешено.

Примечание. Обновление устройств с помощью зашифрованной флэш-памяти не поддерживается.

Примечание. Понижение через OTA с 1.18.3 до 1.18.2.xx не допускается, так как это может привести к сбою дальнейших операций OTA.

Примечание. Перед выполнением прошивки OTA до 1.20 убедитесь, что сценарии на устройстве обновлены (при необходимости) и совместимы с 1.20 FW, чтобы избежать исключений после обновления устройства до 1.20, что может привести к потере соединения с удаленным устройством.

Вы можете найти образы прошивок для 1.18.3 здесь:

Wipy FiPy GPy LoPy LoPy4 SiPy

OTA обновление Lorawan 

Lorawan - это сетевой протокол с низким энергопотреблением (LPWA), который соединяет миллионы устройств по всему миру. Эта технология обеспечивает безопасную связь на больших расстояниях, однако при гораздо более низких скоростях передачи данных, чем традиционные сети, такие как Wi-Fi. В этом уроке мы покажем, как выполнить быстрое, надежное беспроводное обновление (OTA) текстовых файлов (.py, .js, .txt и т.д.), находящихся во флэш-памяти.

Начинаем

Этот проект основан на архитектуре клиент-сервер. Серверы loraserver и mosquitto MQTT используются для передачи данных между приложением обновления и подключенными устройствами. Программа обновления взаимодействует с MQTT loraserver с помощью клиента paho-mqtt. Библиотеки Python и пример кода для средства OTA обновления доступны в следующем репозитории Github.

Настройка сервера

  • Установите loraserver. В этом уроке мы установим версию Docker для Linux.
  • Перейдите в папку loraserver-docker / configuration / loraserver и откройте файл loraserver.toml. Обновите следующие региональные параметры LoraWAN:

name="Your region" E.g "EU_863_870" for Europe 

  rx_window=2

  rx2_dr="Choose one with a payload of 200 bytes or more" E.g 5 for Europe

Пример файла конфигурации с описанием параметров можно найти здесь. Для файлов конфигурации lora-app-server, lora-gateway-bridge и lora-geo-server можно оставить значения по умолчанию.

  • Выполните следующую команду, чтобы запустить loraserver:

  $ sudo docker-compose up

  • Откройте веб-браузер и введите http://localhost:8080 (https, если сертификаты настроены) для доступа к веб-интерфейсу Loraserver.
  • Войдите в loraserver, используя admin для пользователя и пароля.

lorawan

  • Теперь вы должны увидеть веб-интерфейс Loraserver, показанный ниже.

lopraserver основная

  • Нажмите на Network servers и нажмите Add. Введите имя сетевого сервера и loraserver: 8000 в network-server.

lorawan добавление сервера

  • Нажмите на Gateway-профили и нажмите Create. Дайте профилю имя, выберите каналы, и только что созданный сетевой сервер.

lorawan gateway

  • Нажмите на Gateways и нажмите Create. Выберите имя шлюза и заполните поле описания. Прочитайте Gateway ID с вашего lora Gateway и выберите только что созданный сетевой сервер и профиль. Установите флажок «Gateway discovery enabled».

lorawan создать gateway

  • Нажмите на Service-profile, а затем нажмите Create. Выберите service-profile name, выберите созданный ранее сетевой сервер и установите флажок «Add gateway meta-data».

lorawan сервис профиль

  • Нажмите на Device-profile, а затем нажмите кнопку Create. На вкладке "General" введите имя профиля устройства, выберите 1.0.2 в меню версии Lorawan Mac и выберите B в поле "Lorawan Regional parameters revision".

lorawan девайс профиль

На вкладке «Join» установите флажок «Device supports OTAA», иначе автоматически будет выбран ABP. Если вы предпочитаете ABP, вам необходимо заполнить параметры задержки RX1, скорости передачи данных RX2 и частоты канала RX2.

lorawan девайс профиль ОТА

На вкладке класса C установите флажок «Устройство поддерживает класс C».

  • Нажмите «Applications », а затем нажмите «Create». Введите название и описание приложения. Из выпадающего меню выберите сервис-профиль, который мы создали ранее.

lorawan расширения

Нажмите на только что созданное приложение и на вкладке «Devices» выберите «Create». Выберите имя, описание и EUI устройства. В выпадающем меню выберите профиль устройства, созданный на предыдущих этапах.

lorawan создать расширение

На вкладке ключей создайте ключи приложения.

lorawan ключи

  • На данном этапе необходим Python 3. Чтобы определить, установлен ли у вас Python 3, введите следующее:

  $ python3 -V

  • Установите клиент Paho mqtt, используя:

  $ pip3 install paho-mqtt

  • Загрузите программу обновления OTA из следующего репозитория Github.

Серверные скрипты должны выполняться в каталоге, который содержит каждую версию кода конечных устройств, в следующей структуре:

  - server directory

    |- server OTA python scripts

    |- 1.0.0 (client code)

    |  |- flash

    |  |   |- lib

    |  |   |- other text / scripts

    |  |   |- main.py

    |  |   |- boot.py

    |- 1.0.1 (client code)

    |  |- flash

    |  |   |- lib

    |  |   |- main.py

    |  |   |- boot.py

    |  |   |- other text / scripts

    |  |   |- new scripts.py

 

  • Обновите параметры файла конфигурации Python значениями из предыдущих шагов
  • Запустите службу OTA обновления, используя:

  $ python3 updaterService.py

Настройка клиента

Библиотека MicroPyton для взаимодействия с сервером, описанным выше, доступна здесь.

  • Откройте файл main.py и задайте региональные параметры Lorawan точно такими же, как на Gateway и Loraserver. Параметр LORA_DEVICE_CLASS должен быть класса C.

Пример скрипта main.py показан ниже:

  from loranet import LoraNet

  from ota import LoraOTA

  from network import LoRa

  import machine

  import utime

 

  def main():

      LORA_FREQUENCY = 868100000

      LORA_NODE_DR = 5

      LORA_REGION = LoRa.EU868

      LORA_DEVICE_CLASS = LoRa.CLASS_C

      LORA_ACTIVATION = LoRa.OTAA

      LORA_CRED = (′my dev_eui′, ′my app_eui′, ′my app_key′)

 

      lora = LoraNet(LORA_FREQUENCY, LORA_NODE_DR, LORA_REGION, LORA_DEVICE_CLASS, LORA_ACTIVATION, LORA_CRED)

      lora.connect()

 

      ota = LoraOTA(lora)

 

      while True:

        rx = lora.receive(256)

        if rx:

            print(′Received user message: {}′.format(rx))

 

      utime.sleep(2)

 

  main()

  • С помощью плагина pymkr загрузите код на устройство Pycom с поддержкой Lora.

RMT

Подробную информацию об этом классе можно найти в RMT.

Периферийное устройство RMT (Remote Control) ESP32 в первую очередь предназначено для отправки и приема инфракрасных сигналов дистанционного управления, которые используют амплитудную модуляцию, но благодаря конструкции его можно использовать для генерации сигналов различных типов.

RMT имеет 7 каналов, 5 из которых доступны и могут быть привязаны к любому выводу GPIO (Примечание: выводы P13 -P18 могут использоваться только в качестве входов).

RMT таблица

Передача

Следующие примеры создают объект RMT на канале 4, настраивают его для передачи и отправляют некоторые данные в различных формах. Разрешение канала 4 составляет 1000 наносекунд.

В этом первом примере мы определяем сигнал как кортеж двоичных значений, которые определяют форму желаемого сигнала вместе с длительностью бита.

from machine import RMT

# Сопоставляет RMT канал 4 с P21, когда RMT бездействует - будет вывод LOW

rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)

 

# Создаёт шаблон данных по каждому биту

# длительность * разрешающая способность канала = 10000 * 1000нс = 10мс

data = (1,0,1,1,1,0,1,0,1)

duration = 10000

rmt.pulses_send(duration, data)

RMT импульсы

В этом примере мы определяем сигнал по кортежу длительности и состояния, в котором начинается сигнал.

from machine import RMT

# Сопоставляет RMT канал 4 с P21, когда RMT бездействует - будет вывод LOW

rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)

# Список длительностей каждого импульса в единицах каналов # разрешающая способность 

#    длительность = желаемая длительность импульса / разрешение канала 

duration = (8000,11000,8000,11000,6000,13000,6000,3000,8000)

 

# `start_level` определяет, начинается ли сигнал как НИЗКИЙ или ВЫСОКИЙ, затем переключает состояние

rmt.pulses_send(duration, start_level=RMT.HIGH)

RMT импульсы 2

Третий пример представляет собой комбинацию двух вышеуказанных стилей определения сигнала. Каждый импульс имеет определенные продолжительность и состояние. Это полезно, если вы не хотите, чтобы сигнал переключал состояние.

from machine import RMT

# Сопоставляет RMT канал 4 с P21, когда RMT бездействует - будет вывод LOW

rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)

 

# Создаёт шаблон данных по каждому биту

# длительность * разрешающая способность канала = длительность * 1000нс

data = (1,0,1,1,0,1)

duration = (400,200,100,300,200,400)

rmt.pulses_send(duration, data)

RMT импульсы 3

В следующем примере создается объект RMT на канале 4 и показана его настройка для передачи с несущей модуляцией.

from machine import RMT

rmt = RMT(channel=4,

          gpio="P21",

          tx_idle_level=RMT.LOW,

          # Несущая = 100 Гц – это 80%, модулирует ВЫСОКИЕ сигналы

          tx_carrier = (100, 70, RMT.HIGH))  

data = (1,0,1)

duration = 10000

rmt.pulses_send(duration, data)

RMT импульсы 4

В следующем примере создается объект RMT на канале 2, настраивается для приема, а затем идёт ожидание непрерывных импульсов без тайм-аута

from machine import RMT

rmt = machine.RMT(channel=2)

rmt.init(gpio="P21", rx_idle_threshold=1000)

data = rmt.pulses_get()

Если tx_idle_level не установлен в противоположность третьему значению в кортеже tx_carrier, несущая волна будет продолжать генерироваться, когда канал RMT находится в режиме ожидания.

Приём

В следующем примере создается объект RMT на канале 2, настраивается для приема, затем идёт ожидание максимум 1000 мсек до первого импульса.

from machine import RMT

# Устанавливает RMT канал 2 на P21 и устанавливает максимальную длину действительного импульса на 1000 * разрешение канала = 1000 * 100 нс = 100 мкс

rmt = machine.RMT(channel=2, gpio="P21", rx_idle_threshold=1000)

rmt.init()

 

# Получает неопределенное количество импульсов, ожидая максимум 500 мсек до первого импульса (в отличие от других мест, где абсолютная продолжительность была основана на RMT разрешении каналов) до импульса длительностью больше, чем rx_idle_threshold.data = rmt.pulses_get(timeout=500)

 

В следующем примере создается объект RMT на канале 2, настраивается для приема, отфильтровываются импульсы с шириной < 20*100 нано секунд, затем ожидается 100 импульсов

from machine import RMT

 

rmt = machine.RMT(channel=2,  # Resolution = 100ns

                  gpio="P21",

                  # Самый длинный импульс = 1000 * 100 нс = 100 мкс                  rx_idle_threshold=1000,

                  # Отфильтровывает импульсы короче 20*100 нс = 2 мкс

                  rx_filter_threshold=20)

 

# Получает 100 импульсов, импульсы короче 2 мкс или дольше 100 мкс будут проигнорированы. Это означает, что если он получит 80 действительных импульсов, но затем сигнал не меняется в течение 10 часов, а затем происходит еще 20 импульсов, эта функция будет ждать 10 часов

data = rmt.pulses_get(pulses=100)

Сокеты

Настройка сервера с блокирующими сокетами

В следующем примере настраивается сервер, который может принимать 5 подключений параллельно, создавать новый поток для каждого подключенного клиента, получать данные и отправлять обратно, а затем закрывать сокет.

import _thread

import time

 

# Поток для обработки клиента

def client_thread(clientsocket,n):

    # Полечение максимум 12 байт от каждого клиента

    r = clientsocket.recv(12)

 

    # Если recv() возвращает 0, соединение закрывается

    if len(r) == 0:

        clientsocket.close()

        return

    else:

        # Сделаем что-нибудь с полученными данными

        print("Received: {}".format(str(r)))

 

    # Отправим данные обратно

    clientsocket.send(str(n))

 

    # Закрытие сокета

    clientsocket.close()

 

# Настройка

serversocket = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)

serversocket.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)

serversocket.bind(("192.168.0.249", 6543))

 

# Допускается максимум 5 соединений одновременно

serversocket.listen(5)

 

# Отправка уникальных данных

c = 0

while True:

    # Разрешить соединение

    (clientsocket, address) = serversocket.accept()

    # Новый поток

    _thread.start_new_thread(client_thread, (clientsocket, c))

    c = c+1

Использование клиента с неблокирующими сокетами

В следующем примере настраивается клиент, который может подключаться к серверу с 2 неблокирующими сокетами, создавать новый поток для обработки неблокирующих сокетов.

 

import socket

import _thread

import time

import uerrno

import uselect

 

def socket_thread(p):

 

    while True:

        # Ждём, пока какое-либо действие будет бесконечным

        l = p.poll()

 

        # Начало обработки произошедших действий

        for t in l:

            # Первым элементом возвращаемого кортежа является сам сокет

            sock = t[0]

            # Второй элемент возвращаемого кортежа – произошедшие события 

            event = t[1]

 

            # Если произошла какая-либо ошибка или потеря соединения, сокет закрывается            if(event & uselect.POLLERR or event & uselect.POLLHUP):

                sock.close()

                continue

 

            # Если сокет доступен для записи, отправьте некоторые данные

            # Сокет становится доступным для записи, если connect () был успешным

            if(event & uselect.POLLOUT):

                # Если во время отправки возникнет какая-либо ошибка, poll () вернется с событием error – закрытие сокета

                try:

                    sock.send("Data to send")

                    # Мы отправим только одно сообщение на этот сокет, в будущем - только входящие сообщения

                    p.modify(sock, uselect.POLLIN | uselect.POLLHUP | uselect.POLLERR)

                except:

                    pass

 

 

            # Если получены данные, примем их

            if(event & uselect.POLLIN):

                # Если при получении возникнет ошибка, poll () вернется с error – закрытие сокета

                try:

                    r = sock.recv(1)

                    # Если recv () возвращается с 0, значит произошло закрытие соединения                   if len(r) == 0:

                        sock.close()

                        continue

                    else:

                        # Сделаем что-нибудь с полученными данными

                        print("Data received: " + str(r))

                except:

                    pass

 

 

# Список для хранения наших сокетов

socket_list = []

 

# Установка первого сокета в неблокирующий режим

s1 = socket.socket()

s1.setblocking(False)

socket_list.append(s1)

# Установка второго сокета в неблокирующий режим

s2 = socket.socket()

s2.setblocking(False)

socket_list.append(s2)

 

# Создать новый объект опроса

 

p = uselect.poll()

# Зарегистрируйте сокеты в объекте poll, дождитесь всех видов событий

 

p.register(s1, uselect.POLLIN | uselect.POLLOUT | uselect.POLLHUP | uselect.POLLERR)

p.register(s2, uselect.POLLIN | uselect.POLLOUT | uselect.POLLHUP | uselect.POLLERR)

 

# Попробуйте подключиться к серверу с каждым сокетом

 

for s in socket_list:

    try:

        s.connect(socket.getaddrinfo("192.168.0.234", 6543)[0][-1])

    except OSError as e:

        # В случае неблокирующего сокета connect () ожидаемо вызывает EINPROGRESS        if e.args[0] != uerrno.EINPROGRESS:

  

            raise

 

# Запуск потока, который заботится о неблокирующих сокетах через созданный объект

_thread.start_new_thread(socket_thread, (p,))

Подключение к серверу с неблокирующим сокетом SSL

import socket

import ssl

import _thread

import time

import uerrno

import uselect

 

# Вспомогательная функция для выполнения рукопожатия

def handshake_helper(sock):

 

    while True:

        time.sleep_ms(1)

        try:

            # Выполнение рукопожатия

            sock.do_handshake()

            return

        except ssl.SSLError as e:

            # поднять любые другие ошибки, кроме TIMEOUT          

if e.args[0] != ssl.SSL_TIMEOUT:

                raise

 

def socket_thread(p):

 

    while True:

        # Ждём бесконечное действие

        l = p.poll()

 

        # Начать обработку произошедших действий

        for t in l:

            # Первым элементом возвращаемого кортежа является сам сокет

            sock = t[0]

            # Второй элемент возвращаемого кортежа – произошедшие события 

            event = t[1]

 

            # Если произошла какая-либо ошибка или потеря соединения – сокет закрывается

            if(event & uselect.POLLERR or event & uselect.POLLHUP):

                p.unregister(sock)

                sock.close()

 

            # Если переданы данные – примите их

            elif(event & uselect.POLLIN):

                # Если при получении возникнет ошибка, poll () вернется с событием error – закрытие сокета

                try:

                    r = sock.recv(1)

                    # Если recv() вернёт 0 – приозошло закрытие соединения

                    if len(r) == 0:

                        p.unregister(sock)

                        sock.close()

                    else:

                        # Сделаем что-нибудь с полученными данными

                        print("Data received: " + str(r))

                except:

                    pass

 

            # Если сокет доступен для записи, выполнить рукопожатие и отправить некоторые данные

            elif(event & uselect.POLLOUT):

 

                try:

                    # Выполнение рукопожатия SSL

                    handshake_helper(sock)

                except:

                    # Произошла ошибка, закройте сокет, отмените регистрацию

                    p.unregister(sock)

                    sock.close()

 

                # Если во время отправки возникнет ошибка, poll () вернется с событием error – закрытие сокета

                try:

                    sock.send("Data to send")

                    # Мы отправим только одно сообщение на этот сокет, в будущем - только входящие сообщения

                    p.modify(sock, uselect.POLLIN | uselect.POLLHUP | uselect.POLLERR)

                except:

                    pass

 

 

# Создание сокета

s = socket.socket()

ssl_socket = ssl.wrap_socket(s, keyfile="/flash/cert/my_private_key.pem",

                                certfile="/flash/cert/my_public_key.pem",

                                server_side=False,

                                cert_reqs=ssl.CERT_REQUIRED,

                                ca_certs="/flash/cert/my_CA_cert.pem"

                            )

# Установите сокет в неблокирующее состояние

ssl_socket.setblocking(False)

# Создать новый объект

p = uselect.poll()

# Зарегистрируйте сокет в объекте, ожидайте событий write и error

p.register(ssl_socket, uselect.POLLOUT | uselect.POLLHUP | uselect.POLLERR)

 

 # Попробуйте подключиться к серверу

try:

    ssl_socket.connect(socket.getaddrinfo("192.168.0.234", 6543)[0][-1])

except OSError as e:

    # В случае неблокирующего сокета connect () ожидаемо вызывает EINPROGRESS

    if e.args[0] != uerrno.EINPROGRESS:

        # Поднимите все другие ошибки

        raise

 

# Запустить поток, контролирующий неблокирующий сокет через созданный объект 

_thread.start_new_thread(socket_thread, (p))

Сенсор

Пример использования класса:

from machine import Touch

from machine import Timer

import time

 

 

class TouchPad:

 

    def __init__(self, pin, name):

        self.touch = Touch(pin)

        self.last_press = 0

        self.name = name

        self.pressed = False

 

    def ispressed(self):

        if self.touch.read() < (self.touch.init_value() * 2 / 3):

            self.pressed = True

        else:

            self.pressed = False

        return self.pressed

 

    def just_pressed(self):

        now = time.ticks_ms()

        if now - self.last_press < 500:

            return True

        else:

            return False

 

    def set_press(self):

        self.last_press = time.ticks_ms()

 

 

class TouchController:

 

    def __init__(self, pads):

        self.pads = pads

        for pad in self.pads:

            pad.touch.init_value(1500)

 

    def check(self, alarm):

        for pad in self.pads:

            if pad.ispressed() and not pad.just_pressed():

                pad.set_press()

                if pad.name == ′Right′:

                    if tleft.just_pressed():

                        print(′Swipe right′)

                    else:

                        print(′{} pressed′.format(pad.name))

                elif pad.name == ′Left′:

                    if tright.just_pressed():

                        print(′Swipe left′)

                    else:

Контакты

г. Москва, Пятницкое ш. д. 18, пав. 566

zakaz@compacttool.ru

8-495-752-55-22

Информация представленная на данном информационном ресурсе преследует исключительно рекламные цели и не является договором-офертой !

© Все права защищены 2021г https://compacttool.ru