Этот раздел содержит основные примеры, которые работают со всеми модулями Pycom и Платами расширения.
Используя плагин Pymakr, откройте и подключите устройство или используйте последовательный терминал (PuTTY, screen, picocom и т.д.). После подключения должен появиться пустой экран с мигающим курсором. Нажмите Enter, и появится приглашение MicroPython:
>>>
Убедитесь, что все работает с помощью простого теста:
>>> print("Hello LoPy!")
Hello LoPy!
В приведенном выше примере, символы >>> вводить не следует. Они указывают на то, что текст должен быть помещен сразу за приглашением. После набора команды и нажатии клавиши Enter, на экране должен появиться результат, такой же, как в примере.
Основные команды Python могут быть протестированы аналогичным образом. Если команда не работает, попробуйте выполнить либо аппаратный сброс, либо программный сброс: инструкция ниже.
Вот еще один пример использования аппаратных функций устройства:
>>> from machine import Pin
>>> led = Pin(′G16′, mode=Pin.OUT, value=1)
>>> led(0)
>>> led(1)
>>> led.toggle()
>>> 1 + 2
3
>>> 5 / 2
2.5
>>> 20 * ′py′
′pypypypypypypypypypypypypypypypypypypypy′
Если что-то пойдет не так, устройство можно сбросить двумя способами.
Первый способ. Нажмите CTRL-D в приглашении MicroPython, чтобы выполнить программный сброс. Появится следующее сообщение:
>>>
PYB: soft reboot
MicroPython v1.4.6-146-g1d8b5e5 on 2016-10-21; LoPy with ESP32
Type "help()" for more information.
>>>
Если это не помогло, можно выполнить аппаратный сброс (выключение / включение питания) нажатием переключателя RST (маленькая черная кнопка рядом с индикатором RGB), который завершит сеанс telnet и отключит программу, используемую для подключения к устройству Pycom.
WLAN - это системная функция всех устройств Pycom, поэтому она включена по умолчанию.
Чтобы извлечь текущий экземпляр объекта WLAN, запустите:
>>> from network import WLAN
>>> wlan = WLAN() # мы вызываем конструктор без параметров
Текущий режим (WLAN.AP после включения) может быть проверен выполнением:
>>> wlan.mode()
При изменении режима WLAN, согласно нижеприведенным инструкциям, соединение WLAN с устройством Pycom будет разорвано, и команды не будут работать в интерактивном режиме через WiFi.
Существует два способа избежать подобной ситуации:
Сетевой класс WLAN всегда загружается в режиме WLAN.AP; чтобы подключить его к существующей сети, WiFi должен быть настроен на роль клиента:
from network import WLAN
wlan = WLAN(mode=WLAN.STA)
Теперь устройство может приступить к поиску сети:
nets = wlan.scan()
for net in nets:
if net.ssid == ′mywifi′:
print(′Network found!′)
wlan.connect(net.ssid, auth=(net.sec, ′mywifikey′), timeout=5000)
while not wlan.isconnected():
machine.idle() # экономия энергии во время ожидания
print(′WLAN connection succeeded!′)
break
Если пользователю требуется, чтобы при загрузке устройство подключалось к домашнему роутеру с статическим IP-адресом, необходимо использовать следующий сценарий как /flash/boot.py:
import machine
from network import WLAN
wlan = WLAN() # получить текущий объект, без изменения режима
if machine.reset_cause() != machine.SOFT_RESET:
wlan.init(mode=WLAN.STA)
# Конфигурация ниже ДОЛЖНА соответствовать настройкам вашего домашнего маршрутизатора !!
wlan.ifconfig(config=(′192.168.178.107′, ′255.255.255.0′, ′192.168.178.1′, ′8.8.8.8′))
if not wlan.isconnected():
# Измените строку ниже в соответствии с идентфиикатором сети, шифрованием и паролем вашей сети:
wlan.connect(′mywifi′, auth=(WLAN.WPA2, ′mywifikey′), timeout=5000)
while not wlan.isconnected():
machine.idle() # экономия энергии во время ожидания
Обратите внимание, как мы проверяем причину сброса и состояние соединения: это очень важно для того, чтобы иметь возможность программного сброса LoPy во время сеанса telnet без разрыва соединения.
Следующий скрипт содержит список сетей и необязательный список wlan_config для установки фиксированного IP
import os
import machine
uart = machine.UART(0, 115200)
os.dupterm(uart)
known_nets = {
′′: {′pwd′: ′′},
′′: {′pwd′: ′′, ′wlan_config′: (′10.0.0.114′, ′255.255.0.0′, ′10.0.0.1′, ′10.0.0.1′)}, # (ip, subnet_mask, gateway, DNS_server)
}
if machine.reset_cause() != machine.SOFT_RESET:
from network import WLAN
wl = WLAN()
wl.mode(WLAN.STA)
original_ssid = wl.ssid()
original_auth = wl.auth()
print("Scanning for known wifi nets")
available_nets = wl.scan()
nets = frozenset([e.ssid for e in available_nets])
known_nets_names = frozenset([key for key in known_nets])
net_to_use = list(nets & known_nets_names)
try:
net_to_use = net_to_use[0]
net_properties = known_nets[net_to_use]
pwd = net_properties[′pwd′]
sec = [e.sec for e in available_nets if e.ssid == net_to_use][0]
if ′wlan_config′ in net_properties:
wl.ifconfig(config=net_properties[′wlan_config′])
wl.connect(net_to_use, (sec, pwd), timeout=10000)
while not wl.isconnected():
machine.idle() # экономия энергии во время ожидания
print("Connected to "+net_to_use+" with IP address:" + wl.ifconfig()[0])
except Exception as e:
print("Failed to connect to any known network, going into AP mode")
wl.init(mode=WLAN.AP, ssid=original_ssid, auth=original_auth, channel=6, antenna=WLAN.INT_ANT)
Соединение с EAP-TLS:
Перед подключением получите и скопируйте на устройство открытый и закрытый ключи: /flash/cert. Если требуется проверить открытый ключ сервера, также должен быть предоставлен соответствующий сертификат CA.
from network import WLAN
wlan = WLAN(mode=WLAN.STA)
wlan.connect(ssid=′mywifi′, auth=(WLAN.WPA2_ENT,), identity=′myidentity′, ca_certs=′/flash/cert/ca.pem′, keyfile=′/flash/cert/client.key′, certfile=′/flash/cert/client.crt′)
Соединение с EAP-PEAP или EAP-TTLS:
В случае EAP-PEAP (или EAP-TTLS) клиентский ключ и сертификат не нужны, только пара имени пользователя и пароля. Если требуется проверить открытый ключ сервера, также должен быть предоставлен соответствующий сертификат CA.
from network import WLAN
wlan = WLAN(mode=WLAN.STA)
wlan.connect(ssid=′mywifi′, auth=(WLAN.WPA2_ENT, ′username′, ′password′), identity=′myidentity′, ca_certs=′/flash/cert/ca.pem′)
В настоящее время доступна базовая функциональность BLE. В ближайшее время будут реализованы дополнительные функции, страница будет обновлена в соответствии с этими функциями.
Полную информацию о Bluetooth можно найти в разделе Справочника по API встроенного ПО.
Сканирование всех устройств в пределах досягаемости сканирующего устройства.
bluetooth.start_scan(10) # начинает сканирование и останавливается через 10 секунд bluetooth.start_scan(-1) # начинает сканирование бесконечно, пока не будет вызвана bluetooth.stop_scan()
Пример быстрого использования, который сканирует и печатает необработанные данные.
from network import Bluetooth
bluetooth = Bluetooth()
bluetooth.start_scan(-1) # начать сканирование без тайм-аута
while True:
print(bluetooth.get_adv())
from network import Bluetooth
import ubinascii
bluetooth = Bluetooth()
# сканирует, пока мы не сможем подключиться к устройству BLE
bluetooth.start_scan(-1)
adv = None
while True:
adv = bluetooth.get_adv()
if adv:
try:
bluetooth.connect(adv.mac)
except:
# возобновление сканирования
bluetooth.start_scan(-1)
continue
break
print("Connected to device with addr = {}".format(ubinascii.hexlify(adv.mac)))
Подключение к устройству с именем «Heart Rate» и получение данных.
from network import Bluetooth
import time
bt = Bluetooth()
bt.start_scan(-1)
while True:
adv = bt.get_adv()
if adv and bt.resolve_adv_data(adv.data, Bluetooth.ADV_NAME_CMPL) == ′Heart Rate′:
try:
conn = bt.connect(adv.mac)
services = conn.services()
for service in services:
time.sleep(0.050)
if type(service.uuid()) == bytes:
print(′Reading chars from service = {}′.format(service.uuid()))
else:
print(′Reading chars from service = %x′ % service.uuid())
chars = service.characteristics()
for char in chars:
if (char.properties() & Bluetooth.PROP_READ):
print(′char {} value = {}′.format(char.uuid(), char.read()))
conn.disconnect()
break
except:
pass
else:
time.sleep(0.050)
Используйте resol_adv_data (), чтобы попытаться получить имя и данные производителя.
from network import Bluetooth
bluetooth = Bluetooth()
bluetooth.start_scan(20)
while bluetooth.isscanning():
adv = bluetooth.get_adv()
if adv:
# попытка получить полное имя
print(bluetooth.resolve_adv_data(adv.data, Bluetooth.ADV_NAME_CMPL))
mfg_data = bluetooth.resolve_adv_data(adv.data, Bluetooth.ADV_MANUFACTURER_DATA)
if mfg_data:
# попытка получить данные производителя (данные Apple iBeacon отправляются сюда)
print(ubinascii.hexlify(mfg_data))
Базовое соединение с использованием ssl.wrap_socket ().
import socket
import ssl
s = socket.socket()
ss = ssl.wrap_socket(s)
ss.connect(socket.getaddrinfo(′www.google.com′, 443)[0][-1])
Ниже приведен пример использования сертификатов с облаком blynk.
Сертификат был загружен из папки примеров blynk и помещен в / flash / cert / на устройство.
import socket
import ssl
s = socket.socket()
ss = ssl.wrap_socket(s, cert_reqs=ssl.CERT_REQUIRED, ca_certs=′/flash/cert/ca.pem′)
ss.connect(socket.getaddrinfo(′cloud.blynk.cc′, 8441)[0][-1])
Для получения дополнительной информации – изучите статью про модуль ssl в справочнике по API.
MQTT - это легкий протокол обмена сообщениями, который идеально подходит для отправки небольших пакетов данных на устройства IoT и с них через WiFi.
В этом примере используется бесплатный брокер IO Adafruit, который позволяет работать с MQTT.
Посетите https://io.adafruit.com/ и создайте аккаунт. Вам понадобится ключ API и ваши учетные данные. Изучите руководство для получения дополнительной информации о MQTT и о том, как использовать его с брокером Adafruit.
В этом примере будет отправлено сообщение теме в брокере Adafruit MQTT, а затем будет продемонстрировала функция подписки.
from mqtt import MQTTClient
from network import WLAN
import machine
import time
def sub_cb(topic, msg):
print(msg)
wlan = WLAN(mode=WLAN.STA)
wlan.connect("yourwifinetwork", auth=(WLAN.WPA2, "wifipassword"), timeout=5000)
while not wlan.isconnected():
machine.idle()
print("Connected to WiFi ")
client = MQTTClient("device_id", "io.adafruit.com",user="your_username", password="your_api_key", port=1883)
client.set_callback(sub_cb)
client.connect()
client.subscribe(topic="youraccount/feeds/lights")
while True:
print("Sending ON")
client.publish(topic="youraccount/feeds/lights", msg="ON")
time.sleep(1)
print("Sending OFF")
client.publish(topic="youraccount/feeds/lights", msg="OFF")
client.check_msg()
time.sleep(1)
Платформа IoT AWS позволяет устройствам подключаться к облаку Amazon и позволяет облачным приложениям взаимодействовать с подключенными к Интернету объектами. Обычные приложения IoT либо собирают и обрабатывают телеметрию с устройств, либо позволяют пользователям удаленно управлять устройством. Объекты сообщают о своем состоянии, публикуя сообщения в формате JSON на MQTT.
Настройка устройства (Pycom):
Конфигурация (config.py):
Этот файл содержит WiFi, пути к сертификатам и специфические настройки приложения, которые должны быть обновлены пользователем.
WIFI_SSID = ′my_wifi_ssid′
WIFI_PASS = ′my_wifi_password′
# AWS общая конфигурация
AWS_PORT = 8883
AWS_HOST = ′aws_host_url′
AWS_ROOT_CA = ′/flash/cert/aws_root.ca′
AWS_CLIENT_CERT = ′/flash/cert/aws_client.cert′
AWS_PRIVATE_KEY = ′/flash/cert/aws_private.key′
################## Subscribe / Publish client #################
CLIENT_ID = ′PycomPublishClient′
TOPIC = ′PublishTopic′
OFFLINE_QUEUE_SIZE = -1
DRAINING_FREQ = 2
CONN_DISCONN_TIMEOUT = 10
MQTT_OPER_TIMEOUT = 5
LAST_WILL_TOPIC = ′PublishTopic′
LAST_WILL_MSG = ′To All: Last will message′
####################### Shadow updater ########################
#THING_NAME = "my thing name"
#CLIENT_ID = "ShadowUpdater"
#CONN_DISCONN_TIMEOUT = 10
#MQTT_OPER_TIMEOUT = 5
####################### Delta Listener ########################
#THING_NAME = "my thing name"
#CLIENT_ID = "DeltaListener"
#CONN_DISCONN_TIMEOUT = 10
#MQTT_OPER_TIMEOUT = 5
####################### Shadow Echo ########################
#THING_NAME = "my thing name"
#CLIENT_ID = "ShadowEcho"
#CONN_DISCONN_TIMEOUT = 10
#MQTT_OPER_TIMEOUT = 5
Подписаться / опубликовать (main.py)
Чтобы подписаться на тему:
# указанная пользователем функция обратного вызова
def customCallback(client, userdata, message):
print("Received a new message: ")
print(message.payload)
print("from topic: ")
print(message.topic)
print("-------------- ")
# конфигурация MQTT client
pycomAwsMQTTClient = AWSIoTMQTTClient(config.CLIENT_ID)
pycomAwsMQTTClient.configureEndpoint(config.AWS_HOST, config.AWS_PORT)
pycomAwsMQTTClient.configureCredentials(config.AWS_ROOT_CA, config.AWS_PRIVATE_KEY, config.AWS_CLIENT_CERT)
pycomAwsMQTTClient.configureOfflinePublishQueueing(config.OFFLINE_QUEUE_SIZE)
pycomAwsMQTTClient.configureDrainingFrequency(config.DRAINING_FREQ)
pycomAwsMQTTClient.configureConnectDisconnectTimeout(config.CONN_DISCONN_TIMEOUT)
pycomAwsMQTTClient.configureMQTTOperationTimeout(config.MQTT_OPER_TIMEOUT)
pycomAwsMQTTClient.configureLastWill(config.LAST_WILL_TOPIC, config.LAST_WILL_MSG, 1)
# Подключение к MQTT Host
if pycomAwsMQTTClient.connect():
print(′AWS connection succeeded′)
# Подписка на тему
pycomAwsMQTTClient.subscribe(config.TOPIC, 1, customCallback)
time.sleep(2)
# Отправить сообщение
loopCount = 0
while loopCount < 8:
pycomAwsMQTTClient.publish(config.TOPIC, "New Message " + str(loopCount), 1)
loopCount += 1
time.sleep(5.0)
Shadow Updater (main.py)
# указанные пользователем функции обратного вызова
def customShadowCallback_Update(payload, responseStatus, token):
if responseStatus == "timeout":
print("Update request " + token + " time out!")
if responseStatus == "accepted":
payloadDict = json.loads(payload)
print("Update request with token: " + token + " accepted!")
print("property: " + str(payloadDict["state"]["desired"]["property"]))
if responseStatus == "rejected":
print("Update request " + token + " rejected!")
def customShadowCallback_Delete(payload, responseStatus, token):
if responseStatus == "timeout":
print("Delete request " + token + " time out!")
if responseStatus == "accepted":
print("Delete request with token: " + token + " accepted!")
if responseStatus == "rejected":
print("Delete request " + token + " rejected!")
# конфигурация MQTT client
pycomAwsMQTTShadowClient = AWSIoTMQTTShadowClient(config.CLIENT_ID)
pycomAwsMQTTShadowClient.configureEndpoint(config.AWS_HOST, config.AWS_PORT)
pycomAwsMQTTShadowClient.configureCredentials(config.AWS_ROOT_CA, config.AWS_PRIVATE_KEY, config.AWS_CLIENT_CERT)
pycomAwsMQTTShadowClient.configureConnectDisconnectTimeout(config.CONN_DISCONN_TIMEOUT)
pycomAwsMQTTShadowClient.configureMQTTOperationTimeout(config.MQTT_OPER_TIMEOUT)
# подключение к MQTT Host
if pycomAwsMQTTShadowClient.connect():
print(′AWS connection succeeded′)
deviceShadowHandler = pycomAwsMQTTShadowClient.createShadowHandlerWithName(config.THING_NAME, True)
# удаление shadow JSON doc
deviceShadowHandler.shadowDelete(customShadowCallback_Delete, 5)
# обновление shadow in a loop
loopCount = 0
while True:
JSONPayload = ′{"state":{"desired":{"property":′ + str(loopCount) + ′}}}′
deviceShadowHandler.shadowUpdate(JSONPayload, customShadowCallback_Update, 5)
loopCount += 1
time.sleep(5)
Delta Listener (main.py)
# Пользовательский обратный вызов Shadow
def customShadowCallback_Delta(payload, responseStatus, token):
payloadDict = json.loads(payload)
print("property: " + str(payloadDict["state"]["property"]))
print("version: " + str(payloadDict["version"]))
# конфигурация MQTT client
pycomAwsMQTTShadowClient = AWSIoTMQTTShadowClient(config.CLIENT_ID)
pycomAwsMQTTShadowClient.configureEndpoint(config.AWS_HOST, config.AWS_PORT)
pycomAwsMQTTShadowClient.configureCredentials(config.AWS_ROOT_CA, config.AWS_PRIVATE_KEY, config.AWS_CLIENT_CERT)
pycomAwsMQTTShadowClient.configureConnectDisconnectTimeout(config.CONN_DISCONN_TIMEOUT)
pycomAwsMQTTShadowClient.configureMQTTOperationTimeout(config.MQTT_OPER_TIMEOUT)
# подключение к MQTT Host
if pycomAwsMQTTShadowClient.connect():
print(′AWS connection succeeded′)
deviceShadowHandler = pycomAwsMQTTShadowClient.createShadowHandlerWithName(config.THING_NAME, True)
# Listen on deltas
deviceShadowHandler.shadowRegisterDeltaCallback(customShadowCallback_Delta)
# бесконечный цикл
while True:
time.sleep(1)
Этот пример представляет собой простой пример АЦП. Для получения дополнительной информации см. АЦП.
from machine import ADC
adc = ADC(0)
adc_c = adc.channel(pin=′P13′)
adc_c()
adc_c.value()
В настоящее время АЦП ESP32 не откалиброван по умолчанию. Это означает, что он должен быть откалиброван перед каждым использованием. Для этого необходимо измерить внутренний источник опорного напряжения. Следующий код подключит 1.1v к P22:
from machine import ADC
adc = ADC()
# Output Vref of P22
adc.vref_to_pin(′P22′)
Теперь, когда опорное напряжение доступно извне, вы должны измерить его с максимально точным вольтметром. Запишите значение в милливольтах, например, 1120. Для того, чтобы отключить 1.1V от P22, сбросьте модуль. Теперь вы можете откалибровать АЦП, сообщив ему истинное значение напряжения. Затем следует проверить калибровку, подключив АЦП к источнику напряжения.
# Установите калибровку - см. Примечание выше
adc.vref(1100)
# проверьте калибровку
adc_c = adc.channel(pin=′P16′, attn=ADC.ATTN_11DB)
print(adc_c.voltage())
Следующий пример получает данные от датчика освещенности с использованием I2C. Используемый датчик - цифровой датчик света BH1750FVI.
import time
from machine import I2C
import bh1750fvi
i2c = I2C(0, I2C.MASTER, baudrate=100000)
light_sensor = bh1750fvi.BH1750FVI(i2c, addr=i2c.scan()[0])
while(True):
data = light_sensor.read()
print(data)
time.sleep(1)
Драйвера для BH1750FVI
Поместите этот пример кода в файл с именем bh1750fvi.py. Затем его можно импортировать как библиотеку.
# Простой драйвер для цифрового датчика света BH1750FVI
class BH1750FVI:
MEASUREMENT_TIME = const(120)
def __init__(self, i2c, addr=0x23, period=150):
self.i2c = i2c
self.period = period
self.addr = addr
self.time = 0
self.value = 0
self.i2c.writeto(addr, bytes([0x10])) # start continuos 1 Lux readings every 120ms
def read(self):
self.time += self.period
if self.time >= MEASUREMENT_TIME:
self.time = 0
data = self.i2c.readfrom(self.addr, 2)
self.value = (((data[0] << 8) + data[1]) * 1200) // 1000
return self.value
Датчик освещенности и LoRa
Это тот же код с добавленным подключением LoRa, отправляющий значение люкс от датчика освещенности на другое устройство с поддержкой LoRa.
import socket
import time
import pycom
import struct
from network import LoRa
from machine import I2C
import bh1750fvi
LORA_PKG_FORMAT = "!BH"
LORA_CONFIRM_FORMAT = "!BB"
DEVICE_ID = 1
pycom.heartbeat(False)
lora = LoRa(mode=LoRa.LORA, tx_iq=True, region=LoRa.EU868)
lora_sock = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
lora_sock.setblocking(False)
i2c = I2C(0, I2C.MASTER, baudrate=100000)
light_sensor = bh1750fvi.BH1750FVI(i2c, addr=i2c.scan()[0])
while(True):
msg = struct.pack(LORA_PKG_FORMAT, DEVICE_ID, light_sensor.read())
lora_sock.send(msg)
pycom.rgbled(0x150000)
wait = 5
while (wait > 0):
wait = wait - 0.1
time.sleep(0.1)
recv_data = lora_sock.recv(64)
if (len (recv_data) >= 2):
status, device_id = struct.unpack(LORA_CONFIRM_FORMAT, recv_data)
if (device_id == DEVICE_ID and status == 200):
pycom.rgbled(0x001500)
wait = 0
time.sleep(1)
г. Москва, Пятницкое ш. д. 18, пав. 566
zakaz@compacttool.ru
8-495-752-55-22
Информация представленная на данном информационном ресурсе преследует исключительно рекламные цели и не является договором-офертой !
© Все права защищены 2015 - 2024г https://compacttool.ru