Каталог

Примеры LoRa

Следующие руководства демонстрируют функциональность LoRa в LoPy. LoRa может работать в двух разных режимах; LoRa-MAC (который мы также называем Raw-LoRa) и режим LoRaWAN.

Режим LoRa-MAC в основном обращается к радиосвязи напрямую, и пакеты отправляются с использованием модуляции LoRa на выбранной частоте без каких-либо заголовков, адресации или шифрования. В конец пакета добавляется только CRC, и он удаляется до того, как полученный кадр передается приложению. Этот режим может использоваться для создания любого протокола более высокого уровня, который может использовать преимущества дальнего действия модуляции LoRa. Типичные случаи использования включают в себя прямую связь между LoPy и LoPy и пересылку пакетов LoRa.

Режим LoRaWAN реализует полный стек LoRaWAN для устройства класса А. Он поддерживает методы подключения OTAA и ABP, а также расширенные функции, такие как добавление и удаление пользовательских каналов для поддержки «специальных» частотных планов, подобных тем, которые используются в Новой Зеландии.

LoRa-MAC (Raw LoRa)

Базовый пример подключения LoRa, отправка и получение данных. В режиме LoRa-MAC радио используется напрямую без LoRaWAN. Отправленные данные никоим образом не форматируются и не шифруются, не добавляется адресная информация.

Для приведенного ниже примера вам понадобятся два LoPy. Цикл while со случайным временем задержки используется для минимизации шансов одновременной передачи двух LoPy. Запустите приведенный ниже код на двух модулях LoPy, и вы увидите слово «Hello», полученное с обеих сторон.

from network import LoRa

import socket

import machine

import time




# инициализация LoRa в режиме LORA

# Пожалуйста, выберите регион, где вы используете устройство:

# Азия = LoRa.AS923

# Австралия = LoRa.AU915

# Европа = LoRa.EU868

# United States = LoRa.US915

# можно также указать больше параметров, таких как частота, мощность передачи и коэффициент расширения

lora = LoRa (mode = LoRa.LORA, region = LoRa.EU868)




# создать необработанный сокет LoRa

s = socket.socket (socket.AF_LORA, socket.SOCK_RAW)




while True:

    # отправка данных

    s.setblocking(True)

    s.send(‘Hello‘)




    # получение данных

    s.setblocking(False)

    data = s.recv(64)

    print(data)




    # подождать некоторое время

    time.sleep(machine.rng() & 0x0F)

LoRaWAN с ОТАА

OTAA расшифровывается как аутентификация по воздуху. С помощью этого метода LoPy отправляет запрос на присоединение к шлюзу LoRaWAN, используя предоставленные APPEUI и APPKEY. Если ключи верны, шлюз ответит на LoPy сообщением о принятии соединения, и с этого момента LoPy сможет отправлять и получать пакеты на / от шлюза. Если ключи неверны, ответ не будет получен, а метод has_joined () всегда будет возвращать False.

В приведенном ниже примере делается попытка получить любые данные, полученные после отправки кадра. Имейте в виду, что шлюз может не отправлять данные обратно, поэтому мы делаем неблокирование сокета перед данной попыткой, чтобы предотвратить застревание в ожидании пакета, который никогда не поступит.

from network import LoRa

import socket

import time

import ubinascii




# Инициализация LoRa в режиме LORAWAN.

# Пожалуйста, выберите регион, который соответствует тому, где вы используете устройство:

# Азия = LoRa.AS923

# Австралия = LoRa.AU915

# Европа = LoRa.EU868

# United States = LoRa.US915

lora = LoRa (mode = LoRa.LORAWAN, region = LoRa.EU868)




# создать параметры аутентификации OTAA

app_eui = ubinascii.unhexlify (‘ADA4DAE3AC12676B‘)

app_key = ubinascii.unhexlify (‘11B0282A189B75B0B4D2D8C7FA38548B‘)




# присоединиться к сети, используя OTAA

lora.join(activation=LoRa.OTAA, auth=(app_eui, app_key), timeout=0)




# ждать, пока модуль не подключится к сети

while not lora.has_joined():

    time.sleep(2.5)

    print(‘Not yet joined...‘)




# создать сокет LoRa

s = socket.socket (socket.AF_LORA, socket.SOCK_RAW)




# установить скорость передачи данных LoRaWAN

s.setsockopt (socket.SOL_LORA, socket.SO_DR, 5)




# блокировка сокета

# (ожидает отправки данных и истечения срока действия двух окон приема)

s.setblocking (True)




# отправить некоторые данные

s.send(bytes([0x01, 0x02, 0x03]))




# сделать сокет неблокирующим

# (потому что если данные не получены, то будет блокировка)

s.setblocking (False)




# получить любые данные (если есть)

data = s.recv(64)

print(data)

LoRaWAN с ABP

ABP расшифровывается как Authentication By Personalization. Это означает, что ключи шифрования настраиваются на устройстве вручную и могут начинать отправку кадров на шлюз без необходимости процедуры «рукопожатия» для обмена ключами (например, процедуры, выполняемой во время соединения OTAA).

В приведенном ниже примере делается попытка получить любые данные после отправки кадра. Имейте в виду, что шлюз может не отправлять данные обратно, поэтому мы делаем неблокирование сокета перед попыткой его получения, чтобы предотвратить застревание в ожидании пакета, который никогда не поступит.

from network import LoRa

import socket

import ubinascii

import struct




# Инициализировать LoRa в режиме LORAWAN.

# Пожалуйста, выберите регион, который соответствует тому, где вы используете устройство:

# Азия = LoRa.AS923

# Австралия = LoRa.AU915

# Европа = LoRa.EU868

# United States = LoRa.US915

lora = LoRa (mode = LoRa.LORAWAN, region = LoRa.EU868)




# создать параметры аутентификации ABP

dev_addr = struct.unpack(">l", ubinascii.unhexlify(‘00000005‘))[0]

nwk_swkey = ubinascii.unhexlify(‘2B7E151628AED2A6ABF7158809CF4F3C‘)

app_swkey = ubinascii.unhexlify(‘2B7E151628AED2A6ABF7158809CF4F3C‘)




# присоединиться к сети с помощью ABP

lora.join(activation=LoRa.ABP, auth=(dev_addr, nwk_swkey, app_swkey))




# создать сокет LoRa

s = socket.socket (socket.AF_LORA, socket.SOCK_RAW)




# установить скорость передачи данных LoRaWAN

s.setsockopt (socket.SOL_LORA, socket.SO_DR, 5)




# сделать блокировку сокета

# (ожидает отправки данных)

s.setblocking (True)




# отправить данные

s.send (байты ([0x01, 0x02, 0x03]))




# сделать сокет неблокирующим

# (потому что если данные не получены, то будет блокировка)

s.setblocking (False)




# получить любые данные (если есть)

data = s.recv(64)

print(data)

LoRa-MAC Nano-Gateway

В этом примере допускается необработанное соединение LoRa между двумя LoPy (узлами) с одним LoPy, действующим как нано-шлюз.

Для получения дополнительной информации и обсуждения этого кода, см. пост на форуме.

Код шлюза

import socket

import struct

from network import LoRa




# Базовый заголовок пакета, B: 1 байт для идентификатора устройства, B: 1 байт для pkg size, %ds: отформатированная строка для string

_LORA_PKG_FORMAT = "! BB% ds"

# Базовый ack пакет, B: 1 байт для идентификатора устройства, B: 1 байт для размера pkg, B: 1 байт для Ok (200) или сообщений об ошибках.

_LORA_PKG_ACK_FORMAT = "BBB"




# Откройте LoRa Socket, используйте rx_iq, чтобы не слышать наши сообщения

# Пожалуйста, выберите регион, который соответствует тому, где вы используете устройство:

# Asia = LoRa.AS923

# Australia = LoRa.AU915

# Europe = LoRa.EU868

# United States = LoRa.US915




lora = LoRa(mode=LoRa.LORA, rx_iq=True, region=LoRa.EU868)

lora_sock = socket.socket(socket.AF_LORA, socket.SOCK_RAW)

lora_sock.setblocking(False)




while (True):

    recv_pkg = lora_sock.recv(512)

    if (len(recv_pkg) > 2):

        recv_pkg_len = recv_pkg[1]




        device_id, pkg_len, msg = struct.unpack(_LORA_PKG_FORMAT % recv_pkg_len, recv_pkg)




# Если в boot.py установлены uart = machine.UART (0, 115200) и os.dupterm (uart), этот отпечаток должен появиться в последовательном порту

  print(‘Device: %d - Pkg:  %s‘ % (device_id, msg))




        ack_pkg = struct.pack(_LORA_PKG_ACK_FORMAT, device_id, 1, 200)

        lora_sock.send(ack_pkg)

_LORA_PKG_FORMAT используется как метод идентификации различных устройств в сети. _LORA_PKG_ACK_FORMAT –простой ack пакет в ответ на пакет узлов.

Узел

import os

import socket

import time

import struct

from network import LoRa




# Базовый заголовок пакета, B: 1 байт для идентификатора устройства, B: 1 байт для pkg size

_LORA_PKG_FORMAT = "BB% ds"

_LORA_PKG_ACK_FORMAT = "BBB"

DEVICE_ID = 0x01







# Откройте сокет Lora, используйте tx_iq, чтобы не слышать наши сообщения

# Пожалуйста, выберите регион, который соответствует тому, где вы используете устройство:

# Asia = LoRa.AS923

# Australia = LoRa.AU915

# Europe = LoRa.EU868

# United States = LoRa.US915

lora = LoRa(mode=LoRa.LORA, tx_iq=True, region=LoRa.EU868)

lora_sock = socket.socket(socket.AF_LORA, socket.SOCK_RAW)

lora_sock.setblocking(False)




while(True):

    # Пакет отправки, содержащий простую строку

msg = "Device 1 Here"

    pkg = struct.pack(_LORA_PKG_FORMAT % len(msg), DEVICE_ID, len(msg), msg)

    lora_sock.send(pkg)




    # Ждите ответа от шлюза. Для этой демонстрации устройство выполняет бесконечный цикл в ожидании ответа. Введите max_time_waiting для вашего приложения

   waiting_ack = True

    while(waiting_ack):

        recv_ack = lora_sock.recv(256)




        if (len(recv_ack) > 0):

            device_id, pkg_len, ack = struct.unpack(_LORA_PKG_ACK_FORMAT, recv_ack)

            if (device_id == DEVICE_ID):

                if (ack == 200):

                    waiting_ack = False

                    # Если в boot.py установлены uart = machine.UART (0, 115200) и os.dupterm (uart), то в последовательном порту должно это появиться

print("ACK")

                else:

                    waiting_ack = False

                    # Если в boot.py установлены uart = machine.UART (0, 115200) и os.dupterm (uart), то в последовательном порту должно это появиться




print("Message Failed")




time.sleep (5)

Узел всегда отправляет пакеты и ожидает подтверждения от шлюза

Чтобы адаптировать этот код к конкретным потребностям пользователя:

  • Установите максимальное время ожидания для подтверждения и повторной отправки пакета или пометьте его как недействительный
  • Увеличьте размер пакета, изменив _LORA_PKG_FORMAT на BH% ds. «H» позволит хранить 2 байта (для получения дополнительной информации о формате структуры ознакомьтесь с документацией)
  • Уменьшите размер пакета с помощью побитовой манипуляции
  • Уменьшите размер сообщения (в этом примере - строку) до чего-то более полезного для конкретной разработки

LoPy to LoPy

В этом примере показано, как соединить два Pycode LoRa-совместимых модуля (узла) через необработанный LoRa.

Узел А

from network import LoRa

import socket

import time




# Пожалуйста, выберите регион, который соответствует тому, где вы используете устройство:

# Asia = LoRa.AS923

# Australia = LoRa.AU915

# Europe = LoRa.EU868

# United States = LoRa.US915

lora = LoRa(mode=LoRa.LORA, region=LoRa.EU868)

s = socket.socket(socket.AF_LORA, socket.SOCK_RAW)

s.setblocking(False)




while True:

    if s.recv(64) == b‘Ping‘:

        s.send(‘Pong‘)

    time.sleep(5)

Узел Б

from network import LoRa

import socket

import time




# Пожалуйста, выберите регион, который соответствует тому, где вы используете устройство:

# Asia = LoRa.AS923

# Australia = LoRa.AU915

# Europe = LoRa.EU868

# United States = LoRa.US915

lora = LoRa(mode=LoRa.LORA, region=LoRa.EU868)

s = socket.socket(socket.AF_LORA, socket.SOCK_RAW)

s.setblocking(False)

while True:

    s.send(‘Ping‘)

    time.sleep(5)

LoRaWAN Nano-Gateway

Этот пример позволяет подключить LoPy к сети LoRaWAN, такой как The Things Network (TTN) или Loriot, которая будет использоваться в качестве нано-шлюза.

В этом примере используются параметры, специально предназначенные для подключения к The Things Network в европейском регионе 868 МГц. Для другого использования, пожалуйста, смотрите файл config.py для соответствующих разделов, которые необходимо изменить.

Последние версии этих фрагментов можно найти в следующем репозитории GitHub. Для получения дополнительной информации и обсуждения этого кода изучите пост на форуме.

Nano-Gateway

Код Nano-Gateway разделен на 3 файла: main.py, config.py и nanogateway.py. Они используются для настройки и определения того, как шлюз будет подключаться к предпочтительной сети и как он может действовать в качестве сервера пересылки пакетов.

Идентификатор шлюза

Большинство сетевых серверов LoRaWAN ожидают идентификатор шлюза в виде уникального 64-разрядного шестнадцатеричного числа (называемого EUI-64). Рекомендуемой практикой является создание этого идентификатора на вашей плате путем расширения MAC-адреса WiFi (48-битное число, называемое MAC-48). Вы можете сделать это, запустив следующий код перед настройкой:

from network import WLAN

import ubinascii

wl = WLAN()

ubinascii.hexlify(wl.mac())[:6] + ‘FFFE‘ + ubinascii.hexlify(wl.mac())[6:]

Результатом будет что-то вроде b‘240ac4FFFE008d88 ‘, где 240ac4FFFE008d88 - ваш идентификатор шлюза, который будет использоваться в конфигурации вашего сетевого провайдера.

Main (main.py)

Этот файл запускается при загрузке и вызывает файлы библиотеки и config.py для инициализации нано-шлюза. После настройки конфигурации нано-шлюз запускается.

Пример использования LoPy LoRaWAN Nano Gateway

 

import config

from nanogateway import NanoGateway




if __name__ == ‘__main__‘:

    nanogw = NanoGateway(

        id=config.GATEWAY_ID,

        frequency=config.LORA_FREQUENCY,

        datarate=config.LORA_GW_DR,

        ssid=config.WIFI_SSID,

        password=config.WIFI_PASS,

        server=config.SERVER,

        port=config.PORT,

        ntp_server=config.NTP,

        ntp_period=config.NTP_PERIOD_S

        )




    nanogw.start()

    nanogw._log(‘You may now press ENTER to enter the REPL‘)

    input()

Конфигурация (config.py)

Этот файл содержит настройки для сервера и сети, к которой он подключается. В зависимости от региона нано-шлюза и провайдера (TTN, Loriot и т.д.) - они будут различаться. Приведенный пример будет работать с The Things Network (TTN) в европейском регионе, 868 МГц.

Идентификатор шлюза генерируется в сценарии с использованием процесса, описанного выше.

Измените переменные WIFI_SSID и WIFI_PASS, чтобы они соответствовали вашей сети Wi-Fi.

Опции конфигурации LoPy LoRaWAN Nano Gateway

import machine

import ubinascii




WIFI_MAC = ubinascii.hexlify(machine.unique_id()).upper()




# Установите идентификатор шлюза - как первые 3 байта MAC-адреса + «FFFE» + последние 3 байта MAC-адреса

GATEWAY_ID = WIFI_MAC[:6] + "FFFE" + WIFI_MAC[6:12]




SERVER = ‘router.eu.thethings.network‘

PORT = 1700




NTP = "pool.ntp.org"

NTP_PERIOD_S = 3600




WIFI_SSID = ‘my-wifi‘

WIFI_PASS = ‘my-wifi-password‘




# для EU868

LORA_FREQUENCY = 868100000

LORA_GW_DR = "SF7BW125" # DR_5

LORA_NODE_DR = 5




# для US915

# LORA_FREQUENCY = 903900000

# LORA_GW_DR = "SF7BW125" # DR_3

# LORA_NODE_DR = 3

Библиотека (nanogateway.py)

Библиотека контролирует все процессы генерации и пересылки пакетов для данных LoRa. Для этого не требуется никакой пользовательской конфигурации, а последнюю версию этого кода следует загружать из репозитория Pycom GitHub.

Класс LoPy Nano Gateway

from network import WLAN

from network import LoRa

from machine import Timer

import os

import ubinascii

import machine

import json

import time

import errno

import _thread

import socket







PROTOCOL_VERSION = const(2)




PUSH_DATA = const(0)

PUSH_ACK = const(1)

PULL_DATA = const(2)

PULL_ACK = const(4)

PULL_RESP = const(3)




TX_ERR_NONE = "NONE"

TX_ERR_TOO_LATE = "TOO_LATE"

TX_ERR_TOO_EARLY = "TOO_EARLY"

TX_ERR_COLLISION_PACKET = "COLLISION_PACKET"

TX_ERR_COLLISION_BEACON = "COLLISION_BEACON"

TX_ERR_TX_FREQ = "TX_FREQ"

TX_ERR_TX_POWER = "TX_POWER"

TX_ERR_GPS_UNLOCKED = "GPS_UNLOCKED"




STAT_PK = {"stat": {"time": "", "lati": 0,

                   "long": 0, "alti": 0,

                   "rxnb": 0, "rxok": 0,

                   "rxfw": 0, "ackr": 100.0,

                   "dwnb": 0, "txnb": 0}}




RX_PK = {"rxpk": [{"time": "", "tmst": 0,

                  "chan": 0, "rfch": 0,

                  "freq": 868.1, "stat": 1,

                  "modu": "LORA", "datr": "SF7BW125",

                  "codr": "4/5", "rssi": 0,

                  "lsnr": 0, "size": 0,

                  "data": ""}]}




TX_ACK_PK = {"txpk_ack":{"error":""}}







class NanoGateway:




    def __init__(self, id, frequency, datarate, ssid, password, server, port, ntp=‘pool.ntp.org‘, ntp_period=3600):

            self.id = id

        self.frequency = frequency

        self.sf = self._dr_to_sf(datarate)

        self.ssid = ssid

        self.password = password

        self.server = server

        self.port = port

        self.ntp = ntp

        self.ntp_period = ntp_period




        self.rxnb = 0

        self.rxok = 0

                self.rxfw = 0

                self.dwnb = 0

                self.txnb = 0




        self.stat_alarm = None

                self.pull_alarm = None

                self.uplink_alarm = None




        self.udp_lock = _thread.allocate_lock()




        self.lora = None

        self.lora_sock = None




    def start(self):




        # Переключить WiFi в режим STA и подключиться

self.wlan = WLAN(mode=WLAN.STA)

        self._connect_to_wifi()




        # Синхронизировать время

    self.rtc = machine.RTC()

        self.rtc.ntp_sync(self.ntp, update_period=self.ntp_period)




        # Получить IP-адрес сервера и создать сокет UDP

        self.server_ip = socket.getaddrinfo(self.server, self.port)[0][-1]

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        self.sock.setblocking(False)




        # Нажать первый раз 

        self._push_data (self._make_stat_packet ())




        # Создать будильник

        self.stat_alarm = Timer.Alarm(handler=lambda t: self._push_data(self._make_stat_packet()), s=60, periodic=True)

        self.pull_alarm = Timer.Alarm(handler=lambda u: self._pull_data(), s=25, periodic=True)




        # Запустить поток приема UDP

        _thread.start_new_thread (self._udp_thread, ())




        # Инициализировать LoRa в режиме LORA

        self.lora = LoRa(mode=LoRa.LORA, frequency=self.frequency, bandwidth=LoRa.BW_125KHZ, sf=self.sf,

                        preamble=8, coding_rate=LoRa.CODING_4_5, tx_iq=True)




        # Создать необработанный сокет LoRa

        self.lora_sock = socket.socket(socket.AF_LORA, socket.SOCK_RAW)

        self.lora_sock.setblocking(False)

        self.lora_tx_done = False




        self.lora.callback(trigger=(LoRa.RX_PACKET_EVENT | LoRa.TX_PACKET_EVENT), handler=self._lora_cb)




    def stop(self):




        # Проверьте, как остановить синхронизацию NTP

        # Создайте функцию отмены для будильника

        # Остановите UDP thread

        self.sock.close ()




    def _connect_to_wifi (self):

        self.wlan.connect (self.ssid, auth = (None, self.password))

        while not self.wlan.isconnected():

            time.sleep(0.5)

        print("WiFi connected!")




    def _dr_to_sf(self, dr):

        sf = dr[2:4]

        if sf[1] not in ‘0123456789‘:

            sf = sf[:1]

        return int(sf)




    def _sf_to_dr(self, sf):

        return "SF7BW125"




    def _make_stat_packet(self):

        now = self.rtc.now()

        STAT_PK["stat"]["time"] = "%d-%02d-%02d %02d:%02d:%02d GMT" % (now[0], now[1], now[2], now[3], now[4], now[5])

        STAT_PK["stat"]["rxnb"] = self.rxnb

        STAT_PK["stat"]["rxok"] = self.rxok

        STAT_PK["stat"]["rxfw"] = self.rxfw

        STAT_PK["stat"]["dwnb"] = self.dwnb

        STAT_PK["stat"]["txnb"] = self.txnb

        return json.dumps(STAT_PK)




    def _make_node_packet(self, rx_data, rx_time, tmst, sf, rssi, snr):

        RX_PK["rxpk"][0]["time"] = "%d-%02d-%02dT%02d:%02d:%02d.%dZ" % (rx_time[0], rx_time[1], rx_time[2], rx_time[3], rx_time[4], rx_time[5], rx_time[6])

        RX_PK["rxpk"][0]["tmst"] = tmst

        RX_PK["rxpk"][0]["datr"] = self._sf_to_dr(sf)

        RX_PK["rxpk"][0]["rssi"] = rssi

        RX_PK["rxpk"][0]["lsnr"] = float(snr)

        RX_PK["rxpk"][0]["data"] = ubinascii.b2a_base64(rx_data)[:-1]

        RX_PK["rxpk"][0]["size"] = len(rx_data)

        return json.dumps(RX_PK)




    def _push_data(self, data):

        token = os.urandom(2)

        packet = bytes([PROTOCOL_VERSION]) + token + bytes([PUSH_DATA]) + ubinascii.unhexlify(self.id) + data

        with self.udp_lock:

            try:

                self.sock.sendto(packet, self.server_ip)

            except Exception:

                print("PUSH exception")




    def _pull_data(self):

        token = os.urandom(2)

        packet = bytes([PROTOCOL_VERSION]) + token + bytes([PULL_DATA]) + ubinascii.unhexlify(self.id)

        with self.udp_lock:

            try:

                self.sock.sendto(packet, self.server_ip)

            except Exception:

                print("PULL exception")




    def _ack_pull_rsp(self, token, error):

        TX_ACK_PK["txpk_ack"]["error"] = error

        resp = json.dumps(TX_ACK_PK)

        packet = bytes([PROTOCOL_VERSION]) + token + bytes([PULL_ACK]) + ubinascii.unhexlify(self.id) + resp

        with self.udp_lock:

            try:

                self.sock.sendto(packet, self.server_ip)

            except Exception:

                print("PULL RSP ACK exception")




    def _lora_cb(self, lora):

        events = lora.events()

        if events & LoRa.RX_PACKET_EVENT:

            self.rxnb += 1

            self.rxok += 1

            rx_data = self.lora_sock.recv(256)

            stats = lora.stats()

            self._push_data(self._make_node_packet(rx_data, self.rtc.now(), stats.timestamp, stats.sf, stats.rssi, stats.snr))

            self.rxfw += 1

        if events & LoRa.TX_PACKET_EVENT:

            self.txnb += 1

            lora.init(mode=LoRa.LORA, frequency=self.frequency, bandwidth=LoRa.BW_125KHZ,

                     sf=self.sf, preamble=8, coding_rate=LoRa.CODING_4_5, tx_iq=True)




    def _send_down_link(self, data, tmst, datarate, frequency):

        self.lora.init(mode=LoRa.LORA, frequency=frequency, bandwidth=LoRa.BW_125KHZ,

                      sf=self._dr_to_sf(datarate), preamble=8, coding_rate=LoRa.CODING_4_5,

                      tx_iq=True)

        while time.ticks_us() < tmst:

            pass

        self.lora_sock.send(data)




    def _udp_thread(self):

        while True:

            try:

                data, src = self.sock.recvfrom(1024)

                _token = data[1:3]

                _type = data[3]

                if _type == PUSH_ACK:

                    print("Push ack")

                elif _type == PULL_ACK:

                    print("Pull ack")

                elif _type == PULL_RESP:

                    self.dwnb += 1

                    ack_error = TX_ERR_NONE

                    tx_pk = json.loads(data[4:])

                    tmst = tx_pk["txpk"]["tmst"]

                    t_us = tmst - time.ticks_us() - 5000

                    if t_us < 0:

                        t_us += 0xFFFFFFFF

                    if t_us < 20000000:

                        self.uplink_alarm = Timer.Alarm(handler=lambda x: self._send_down_link(ubinascii.a2b_base64(tx_pk["txpk"]["data"]),

                                                                                              tx_pk["txpk"]["tmst"] - 10, tx_pk["txpk"]["datr"],

                                                                                              int(tx_pk["txpk"]["freq"] * 1000000)), us=t_us)

                    else:

                        ack_error = TX_ERR_TOO_LATE

                        print("Downlink timestamp error!, t_us:", t_us)

                    self._ack_pull_rsp(_token, ack_error)

                    print("Pull rsp")

            except socket.timeout:

                pass

            except OSError as e:

                if e.errno == errno.EAGAIN:

                    pass

                else:

                    print("UDP recv OSError Exception")

            except Exception:

                print("UDP recv Exception")

            # Сделайте паузу перед следующей попыткой

            time.sleep (0.025)

Регистрация ТТН

Чтобы настроить шлюз с The Things Network (TTN), перейдите на их веб-сайт и создайте / зарегистрируйте учетную запись. Введите имя пользователя и адрес электронной почты, чтобы проверить их платформу.

Регистрация ТТN

Как только учетная запись была зарегистрирована, Nano-Gateway может быть зарегистрирован. Для этого перейдите на веб-страницу консоли TTN.

Регистрация Gateway

Внутри консоли TTN есть два варианта: приложения и шлюзы. Выберите шлюзы и затем нажмите «Зарегистрировать шлюз». Это позволит настроить и зарегистрировать новый нано-шлюз.

Регистрация гейтвэй

На странице «Регистрация шлюза» вам необходимо установить следующие параметры:

Параметры регистрации шлюза

 Они уникальны для каждого шлюза, местоположения и частоты конкретной страны. Убедитесь, что выбраны правильные настройки, иначе шлюз не будет подключаться к TTN.

Вам нужно поставить галочку «Я использую устаревший перенаправитель пакетов», чтобы включить правильные настройки. Это связано с тем, что Nano-Gateway использует стандартный де-факто протокол Semtech UDP.

Значения опций шлюза

Gateway EUI должен соответствовать вашему идентификатору из файла config.py. Мы предлагаем вам выполнить процедуру, описанную в верхней части этого документа, чтобы создать свой собственный уникальный идентификатор шлюза.

После применения этих настроек нажмите «Зарегистрировать шлюз». Появится страница обзора шлюза с отображением параметров конфигурации. Затем нажмите на «Настройки шлюза» и настройте адрес маршрутизатора в соответствии с адресом шлюза (по умолчанию: router.eu.thethings.network).

Регистрация шлюза

Теперь шлюз должен быть настроен. Один или несколько узлов теперь могут быть настроены на использование нано-шлюза, и могут быть созданы приложения TTN.

LoPy Node

Существует два способа подключения устройств LoPy к нано-шлюзу: активация по воздуху (OTAA) и активация посредством персонализации (ABP). Код и инструкции для регистрации этих методов показаны ниже, далее идёт инструкция о том, как подключить их к приложению на TTN.

Важно, чтобы именно следующие примеры кода (также на GitHub) использовались для подключения к нано-шлюзу, поскольку он поддерживает только одноканальные соединения.

ОТАА (активация по воздуху)

Когда LoPy подключает приложение (через TTN) с помощью OTAA, конфигурация сети определяется автоматически во время рукопожатия между LoPy и сетевым сервером. Обратите внимание, что сетевые ключи, полученные с использованием методологии OTAA, специфичны для устройства и используются для шифрования и проверки передач на сетевом уровне.

Пример узла OTAA, совместимый с шлюзом LoPy Nano

from network import LoRa

import socket

import ubinascii

import struct

import time




# Инициализировать LoRa в режиме LORAWAN.

lora = LoRa (mode = LoRa.LORAWAN)




# создать параметры аутентификации OTA

dev_eui = ubinascii.unhexlify (‘AABBCCDDEEFF7778‘) # эти настройки можно найти в TTN

app_eui = ubinascii.unhexlify (‘70B3D57EF0003BFD‘) # эти настройки можно найти в TTN

app_key = ubinascii.unhexlify (‘36AB7625FE77776881683B495300FFD6‘) # эти настройки можно найти в TTN




# установить 3 канала по умолчанию на одинаковую частоту (должно быть сделано перед отправкой запроса на присоединение OTAA)

lora.add_channel(0, frequency=868100000, dr_min=0, dr_max=5)

lora.add_channel(1, frequency=868100000, dr_min=0, dr_max=5)

lora.add_channel(2, frequency=868100000, dr_min=0, dr_max=5)




# присоединиться к сети, используя OTAA

lora.join(activation=LoRa.OTAA, auth=(dev_eui, app_eui, app_key), timeout=0)




# ждать, пока модуль не подключится к сети

while not lora.has_joined():

    time.sleep(2.5)

    print(‘Not joined yet...‘)




# удалить все каналы не по умолчанию

for i in range(3, 16):

    lora.remove_channel(i)




# создать сокет LoRa

s = socket.socket (socket.AF_LORA, socket.SOCK_RAW)




# установить скорость передачи данных LoRaWAN

s.setsockopt (socket.SOL_LORA, socket.SO_DR, 5)




# сделать сокет неблокирующим

s.setblocking (False)




time.sleep (5.0)




Ниже может быть написан ваш собственный код!







for i in range (200):

    s.send(b‘PKT #‘ + bytes([i]))

    time.sleep(4)

    rx = s.recv(256)

    if rx:

        print(rx)

    time.sleep(6)

ABP (активация по персонализации)

Использование режима соединения ABP требует от пользователя определения следующих значений и их ввода в приложение LoPy и приложение TTN:

  • Адрес устройства
  • Ключ приложения
  • Ключ сетевой сессии

Пример узла ABP, совместимый с шлюзом LoPy Nano

from network import LoRa

import socket

import ubinascii

import struct

import time




# Инициализировать LoRa в режиме LORAWAN.

lora = LoRa (mode = LoRa.LORAWAN)




# создать параметры аутентификации ABP

dev_addr = struct.unpack ("> l", ubinascii.unhexlify (‘2601147D‘)) [0] # эти настройки можно найти в TTN

nwk_swkey = ubinascii.unhexlify (‘3C74F4F40CAE2221303BC24284FCF3AF‘) # эти настройки можно найти в TTN

app_swkey = ubinascii.unhexlify (‘0FFA7072CC6FF69A102A0F39BEB0880F‘) # эти настройки можно найти в TTN




# присоединиться к сети с помощью ABP

lora.join(activation=LoRa.ABP, auth=(dev_addr, nwk_swkey, app_swkey))




# удалить все каналы не по умолчанию

for i in range(3, 16):

    lora.remove_channel(i)




# установить 3 канала по умолчанию на одинаковую частоту

lora.add_channel(0, frequency=868100000, dr_min=0, dr_max=5)

lora.add_channel(1, frequency=868100000, dr_min=0, dr_max=5)

lora.add_channel(2, frequency=868100000, dr_min=0, dr_max=5)




# создать сокет LoRa

s = socket.socket (socket.AF_LORA, socket.SOCK_RAW)




# установить скорость передачи данных LoRaWAN

s.setsockopt (socket.SOL_LORA, socket.SO_DR, 5)




# сделать сокет неблокирующим

s.setblocking (False)




Ниже может быть написан ваш собственный код!




for i in range (200):

    s.send(b‘PKT #‘ + bytes([i]))

    time.sleep(4)

    rx = s.recv(256)

    if rx:

        print(rx)

    time.sleep(6)

Приложения TTN

Теперь, когда шлюз и узлы настроены, можно создать приложение TTN; то, что происходит с данными LoRa после их получения TTN. Существует ряд различных настроек / систем, которые можно использовать, однако в следующем примере демонстрируется интеграция HTTP-запроса.

Регистрация приложения

Выберите вкладку «Приложения» в верхней части консоли TTN: откроется экран регистрации приложений. Нажмите «Зарегистр

footer shadow
Контакты

г. Москва, Пятницкое ш. д. 18, пав. 566

zakaz@compacttool.ru

8-495-752-55-22

compacttool logoadaptive site

accepted payment systems

Информация представленная на данном информационном ресурсе преследует исключительно рекламные цели и не является договором-офертой !

© Все права защищены 2015 - 2024г https://compacttool.ru