目录
一、前言
二、上行
1、上行的过程
2、thread_up
3、lgw_receive函数
三、下行
1、下行的过程
2、thread_down
3、thread_jit
4、lgw_send函数
一、前言
本篇文章主要讲解LoRaWAN网关是如何处理上下行消息的,我使用的LoRaWAN网关集中器是sx1302(射频前端是sx1250),接下来,分别从上行的流程、下行的流程进行主要归纳整理。
二、上行
1、上行的过程
节点数据由sx1278射频芯片变成无线电模拟信号发出,我们把有效的消息叫做payload。
集中器收到无线电模拟信号后,会通过8个IF信道解调成数字信号,然后纠错,再将数据存储在LoRa集中器的数据缓冲区中。集中器在一段时间内会通过函数lgw_receive
,通过SPI从寄存器里获取数据(包括payload和元数据,元数据是指负载的出处,例如来自哪一路RF、哪一路IF等),并将其存放到结构体struct lgw_pkt_rx_s
中。网关程序中的上行线程thread_up
会调用lgw_receive
函数,并创建一个指针来接受结构体数组rxpkt
(每一个数组元素就是一个包),然后解析其中的数据并依次封装到buff_up
中,发送PUSH_DATA
包给服务器。发送完成后,服务器会下发一个PUSH_ACK
包来确认已经收到了消息。
2、thread_up
该函数是一个用于LoRaWAN网关的上行数据处理的线程函数。主要功能是从LoRa网关接收数据包(通过lgw_receive函数
),将数据包的元数据和负载转换为JSON格式,然后通过套接字发送到远程服务器并接收服务器的响应。
主要流程:
1、建立与网络服务器的连接:通过UDP套接字与服务器进行通信。
2、从Lora集中器接收数据包:使用lgw_receive
函数从LoRa集中器接收上行数据包。
3、打包数据:将接收到的数据包封装成PUSH_DATA消息。
4、发送数据包到服务器:通过UDP将PUSH_DATA消息发送到服务器。
5、处理服务器响应:接收服务器返回的PUSH_ACK消息,并进行相应处理。
void thread_up(void) {
int i, j, k; /* 循环变量 */
unsigned pkt_in_dgram; /* 当前数据报中的LoRa数据包数量 */
char stat_timestamp[24]; /* 用于存储时间戳的字符串 */
time_t t;
/* 为数据包获取和处理分配内存 */
struct lgw_pkt_rx_s rxpkt[NB_PKT_MAX]; /* 包含传入数据包和元数据的数组 */
struct lgw_pkt_rx_s *p; /* 指向接收数据包的指针 */
int nb_pkt;
/* 本地GPS时间参考的副本 */
bool ref_ok = false; /* 确定是否使用GPS时间参考 */
struct tref local_ref; /* 用于UTC <-> 时间戳转换的时间参考 */
/* 数据缓冲区 */
uint8_t buff_up[TX_BUFF_SIZE]; /* 上行数据包的缓冲区 */
int buff_index;
uint8_t buff_ack[32]; /* 接收确认消息的缓冲区 */
/* 协议变量 */
uint8_t token_h; /* 用于确认匹配的随机令牌 */
uint8_t token_l; /* 用于确认匹配的随机令牌 */
/* ping 测量变量 */
struct timespec send_time;
struct timespec recv_time;
/* GPS 同步变量 */
struct timespec pkt_utc_time;
struct tm * x; /* UTC 时间分解 */
struct timespec pkt_gps_time;
uint64_t pkt_gps_time_ms;
/* 报告管理变量 */
bool send_report = false;
/* mote 信息变量 */
uint32_t mote_addr = 0;
uint16_t mote_fcnt = 0;
/* 设置上行套接字接收超时 */
i = setsockopt(sock_up, SOL_SOCKET, SO_RCVTIMEO, (void *)&push_timeout_half, sizeof push_timeout_half);
if (i != 0) {
MSG("ERROR: [up] setsockopt returned %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
while (!exit_sig && !quit_sig) {
/* 获取数据包 */
pthread_mutex_lock(&mx_concent);
nb_pkt = lgw_receive(NB_PKT_MAX, rxpkt);
pthread_mutex_unlock(&mx_concent);
if (nb_pkt == LGW_HAL_ERROR) {
MSG("ERROR: [up] failed packet fetch, exiting\n");
exit(EXIT_FAILURE);
}
/* 检查是否有要发送的状态报告 */
send_report = report_ready; /* 复制变量以避免在函数执行中更改 */
/* 不需要互斥锁,只需读取 */
/* 如果没有数据包和状态报告要发送,则等待一段时间 */
if ((nb_pkt == 0) && (send_report == false)) {
wait_ms(FETCH_SLEEP_MS);
continue;
}
/* 获取GPS时间参考的副本(避免每个数据包一个互斥锁) */
if ((nb_pkt > 0) && (gps_enabled == true)) {
pthread_mutex_lock(&mx_timeref);
ref_ok = gps_ref_valid;
local_ref = time_reference_gps;
pthread_mutex_unlock(&mx_timeref);
} else {
ref_ok = false;
}
/* 获取统计信息的时间戳 */
t = time(NULL);
strftime(stat_timestamp, sizeof stat_timestamp, "%F %T %Z", gmtime(&t));
MSG_DEBUG(DEBUG_PKT_FWD, "\nCurrent time: %s \n", stat_timestamp);
/* 开始组装数据报文头部 */
token_h = (uint8_t)rand(); /* 随机令牌 */
token_l = (uint8_t)rand(); /* 随机令牌 */
buff_up[1] = token_h;
buff_up[2] = token_l;
buff_index = 12; /* 12字节头部 */
/* 开始JSON结构 */
memcpy((void *)(buff_up + buff_index), (void *)"{\"rxpk\":[", 9);
buff_index += 9;
/* 序列化LoRa数据包元数据和负载 */
pkt_in_dgram = 0;
for (i = 0; i < nb_pkt; ++i) {
p = &rxpkt[i];
/* 从当前数据包获取 mote 信息(地址,fcnt)*/
/* FHDR - DevAddr */
if (p->size >= 8) {
mote_addr = p->payload[1];
mote_addr |= p->payload[2] << 8;
mote_addr |= p->payload[3] << 16;
mote_addr |= p->payload[4] << 24;
/* FHDR - FCnt */
mote_fcnt = p->payload[6];
mote_fcnt |= p->payload[7] << 8;
} else {
mote_addr = 0;
mote_fcnt = 0;
}
/* 基本数据包过滤 */
pthread_mutex_lock(&mx_meas_up);
meas_nb_rx_rcv += 1;
switch(p->status) {
case STAT_CRC_OK:
meas_nb_rx_ok += 1;
if (!fwd_valid_pkt) {
pthread_mutex_unlock(&mx_meas_up);
continue; /* 跳过该数据包 */
}
break;
case STAT_CRC_BAD:
meas_nb_rx_bad += 1;
if (!fwd_error_pkt) {
pthread_mutex_unlock(&mx_meas_up);
continue; /* 跳过该数据包 */
}
break;
case STAT_NO_CRC:
meas_nb_rx_nocrc += 1;
if (!fwd_nocrc_pkt) {
pthread_mutex_unlock(&mx_meas_up);
continue; /* 跳过该数据包 */
}
break;
default:
MSG("WARNING: [up] received packet with unknown status %u (size %u, modulation %u, BW %u, DR %u, RSSI %.1f)\n", p->status, p->size, p->modulation, p->bandwidth, p->datarate, p->rssic);
pthread_mutex_unlock(&mx_meas_up);
continue; /* 跳过该数据包 */
}
meas_up_pkt_fwd += 1;
meas_up_payload_byte += p->size;
pthread_mutex_unlock(&mx_meas_up);
printf( "\nINFO: Received pkt from mote: %08X (fcnt=%u)\n", mote_addr, mote_fcnt );
/* 数据包开始,如果需要添加数据包间分隔符 */
if (pkt_in_dgram == 0) {
buff_up[buff_index] = '{';
++buff_index;
} else {
buff_up[buff_index] = ',';
buff_up[buff_index+1] = '{';
buff_index += 2;
}
/* JSON rxpk帧格式版本,8个字符 */
j = snprintf((char *)(buff_up + buff_index), TX_BUFF_SIZE-buff_index, "\"jver\":%d", PROTOCOL_JSON_RXPK_FRAME_FORMAT );
if (j > 0) {
buff_index += j;
} else {
MSG("ERROR: [up] snprintf failed line %u\n", (__LINE__ - 4));
exit(EXIT_FAILURE);
}
/* RAW时间戳,8-17个字符 */
j = snprintf((char *)(buff_up + buff_index), TX_BUFF_SIZE-buff_index, ",\"tmst\":%u", p->count_us);
if (j > 0) {
buff_index += j;
} else {
MSG("ERROR: [up] snprintf failed line %u\n", (__LINE__ - 4));
exit(EXIT_FAILURE);
}
/* 包接收时间(基于GPS),37个字符 */
if (ref_ok == true) {
/* 将数据包时间戳转换为UTC绝对时间 */
j = lgw_cnt2utc(local_ref, p->count_us, &pkt_utc_time);
if (j == LGW_GPS_SUCCESS) {
/* 将UNIX时间戳拆分为其日历组件 */
x = gmtime(&(pkt_utc_time.tv_sec));
j = snprintf((char *)(buff_up + buff_index), TX_BUFF_SIZE-buff_index, ",\"time\":\"%04i-%02i-%02iT%02i:%02i:%02i.%06liZ\"", (x->tm_year)+1900, (x->tm_mon)+1, x->tm_mday, x->tm_hour, x->tm_min, x->tm_sec, (pkt_utc_time.tv_nsec)/1000); /* ISO 8601格式 */
if (j > 0) {
buff_index += j;
} else {
MSG("ERROR: [up] snprintf failed line %u\n", (__LINE__ - 4));
exit(EXIT_FAILURE);
}
} else {
MSG("ERROR: [up] lgw_cnt2utc failed line %u\n", (__LINE__ - 2));
exit(EXIT_FAILURE);
}
}
/* 包接收时间(基于GPS),不少于16个字符 */
if (ref_ok == true) {
/* 将数据包时间戳转换为UNIX时间 */
j = lgw_cnt2gps(local_ref, p->count_us, &pkt_gps_time);
if (j == LGW_GPS_SUCCESS) {
/* 将GPS时间戳转换为毫秒数 */
pkt_gps_time_ms = (uint64_t)pkt_gps_time.tv_sec * 1e3 + pkt_gps_time.tv_nsec / 1e6;
/* JSON结构中的gts时间 */
j = snprintf((char *)(buff_up + buff_index), TX_BUFF_SIZE-buff_index, ",\"gps_time_ms\":%llu", pkt_gps_time_ms);
if (j > 0) {
buff_index += j;
} else {
MSG("ERROR: [up] snprintf failed line %u\n", (__LINE__ - 4));
exit(EXIT_FAILURE);
}
} else {
MSG("ERROR: [up] lgw_cnt2gps failed line %u\n", (__LINE__ - 2));
exit(EXIT_FAILURE);
}
}
/* Packet MetaData, RF chains, RSSI, etc... */
j = snprintf((char *)(buff_up + buff_index), TX_BUFF_SIZE-buff_index, ",\"rssi\":%.0f", p->rssic);
if (j > 0) {
buff_index += j;
} else {
MSG("ERROR: [up] snprintf failed line %u\n", (__LINE__ - 4));
exit(EXIT_FAILURE);
}
j = snprintf((char *)(buff_up + buff_index), TX_BUFF_SIZE-buff_index, ",\"lsnr\":%.1f", p->snr);
if (j > 0) {
buff_index += j;
} else {
MSG("ERROR: [up] snprintf failed line %u\n", (__LINE__ - 4));
exit(EXIT_FAILURE);
}
if (p->status == STAT_CRC_OK) {
j = snprintf((char *)(buff_up + buff_index), TX_BUFF_SIZE-buff_index, ",\"size\":%u,\"data\":\"", p->size);
if (j > 0) {
buff_index += j;
} else {
MSG("ERROR: [up] snprintf failed line %u\n", (__LINE__ - 4));
exit(EXIT_FAILURE);
}
/* RF packet payload to be converted into JSON object */
for (j = 0; j < p->size; ++j) {
k = snprintf((char *)(buff_up + buff_index), TX_BUFF_SIZE-buff_index, "%02x", p->payload[j]);
if (k > 0) {
buff_index += k;
} else {
MSG("ERROR: [up] snprintf failed line %u\n", (__LINE__ - 4));
exit(EXIT_FAILURE);
}
}
buff_up[buff_index] = '"';
++buff_index;
}
pkt_in_dgram += 1;
/* 最后一个数据包完成 */
if (pkt_in_dgram == NB_PKT_MAX) {
break;
}
}
/* 组装JSON结构的结尾 */
buff_up[buff_index] = ']';
buff_up[buff_index+1] = '}';
buff_index += 2;
/* 发送数据包 */
send(sock_up, (void *)buff_up, buff_index, 0);
/* 记录发送的数据 */
MSG("PKT SEND, freq %u, datarate %u, mod %u, count_us %u\n", rxpkt[0].freq_hz, rxpkt[0].datarate, rxpkt[0].modulation, rxpkt[0].count_us);
/* 数据包统计 */
pthread_mutex_lock(&mx_meas_up);
meas_up_pkt_sent += 1;
pthread_mutex_unlock(&mx_meas_up);
/* 等待随机时间以避免碰撞 */
wait_ms(rand()%100);
/* 如果有状态报告,则将其发送 */
if (send_report == true) {
/* 调用状态报告发送功能 */
send_report = false; /* 清除标志 */
}
}
MSG("\nINFO: End of upstream thread\n");
/* 清理资源 */
close(sock_up);
pthread_cleanup_pop(1); /* 结束清理 */
}
3、lgw_receive函数
int lgw_receive(uint8_t max_pkt, struct lgw_pkt_rx_s *pkt_data);
max_pkt
:用户提供的缓冲区中可以存储的最大数据包数量。pkt_data
:指向一个数组,该数组用于存储接收到的数据包。
lgw_receive
函数的主要目的是从LoRa集中器的接收缓冲区中获取数据包,并将这些数据包解析后存储在用户提供的缓冲区中。
工作流程:
1、初始化变量:初始化用于存储接收到的数据包数量、当前温度、RSSI温度补偿值等的变量。
2、记录函数开始时间:用于性能测量。
3、从sx1302获取数据包:调用sx1302_fetch
函数从SX1302中获取数据包数量。
4、更新内部计数器:调用sx1302_update
函数更新SX1302的状态。
5、检查是否有数据包:如果没有数据包,则返回0。
6、处理接收到的数据包:
- 如果接收到的数据包数量大于
max_pkt
,则记录警告信息。 - 遍历接收到的数据包并进行解析,将解析后的数据存储在
pkt_data
数组中。 - 应用RSSI温度补偿。
7、去除重复数据包:如果启用了精确时间戳,则去除重复的数据包。
8、记录函数结束时间
9、返回接收到数据包的数量
int lgw_receive(uint8_t max_pkt, struct lgw_pkt_rx_s *pkt_data) {
int res; // 用于存储函数调用返回结果的变量
uint8_t nb_pkt_fetched = 0; // 存储从SX1302获取的数据包数量
uint8_t nb_pkt_found = 0; // 记录已经找到并处理的数据包数量
uint8_t nb_pkt_left = 0; // 记录未处理的数据包数量
float current_temperature = 0.0, rssi_temperature_offset = 0.0; // 存储当前温度和RSSI温度补偿值
struct timeval tm; // 用于记录函数执行时间的结构体
DEBUG_PRINTF(" --- %s\n", "IN"); // 打印调试信息,标记函数开始执行
/* Record function start time */
_meas_time_start(&tm); // 记录函数开始执行的时间点
/* Get packets from SX1302, if any */
res = sx1302_fetch(&nb_pkt_fetched); // 从SX1302芯片获取数据包数量
if (res != LGW_REG_SUCCESS) { // 检查获取数据包是否成功
printf("ERROR: failed to fetch packets from SX1302\n");
return LGW_HAL_ERROR; // 获取失败,返回错误状态
}
/* Update internal counter */
res = sx1302_update(); // 更新SX1302内部状态
if (res != LGW_REG_SUCCESS) { // 检查更新状态是否成功
return LGW_HAL_ERROR; // 更新失败,返回错误状态
}
/* Exit now if no packet fetched */
if (nb_pkt_fetched == 0) { // 如果未获取到数据包,则立即退出函数
_meas_time_stop(1, tm, __FUNCTION__); // 停止测量函数执行时间,并记录
return 0; // 返回接收到的数据包数量为0
}
/* Warn if more packets fetched than expected */
if (nb_pkt_fetched > max_pkt) { // 如果实际获取的数据包数量超过了预期的最大数量
nb_pkt_left = nb_pkt_fetched - max_pkt; // 计算未处理的数据包数量
printf("WARNING: not enough space allocated, fetched %d packet(s), %d will be left in RX buffer\n", nb_pkt_fetched, nb_pkt_left);
}
/* Apply RSSI temperature compensation */
// 以下部分为应用RSSI温度补偿的代码段,这部分目前被注释掉了(#if 0),可能是未启用或者调试中的功能
/* Iterate on the RX buffer to get parsed packets */
// 迭代处理接收到的数据包
for (nb_pkt_found = 0; nb_pkt_found < ((nb_pkt_fetched <= max_pkt) ? nb_pkt_fetched : max_pkt); nb_pkt_found++) {
/* Get packet and move to next one */
res = sx1302_parse(&lgw_context, &pkt_data[nb_pkt_found]); // 解析接收到的数据包
if (res == LGW_REG_WARNING) { // 如果解析警告
printf("WARNING: parsing error on packet %d, discarding fetched packets\n", nb_pkt_found);
return LGW_HAL_SUCCESS; // 解析错误但不致命,返回成功状态
} else if (res == LGW_REG_ERROR) { // 如果解析错误
printf("ERROR: fatal parsing error on packet %d, aborting...\n", nb_pkt_found);
return LGW_HAL_ERROR; // 解析错误,返回错误状态
}
/* Apply RSSI offset calibrated for the board */
// 应用与板子校准的RSSI偏移
pkt_data[nb_pkt_found].rssic += CONTEXT_RF_CHAIN[pkt_data[nb_pkt_found].rf_chain].rssi_offset;
pkt_data[nb_pkt_found].rssis += CONTEXT_RF_CHAIN[pkt_data[nb_pkt_found].rf_chain].rssi_offset;
// 获取RSSI温度补偿值
rssi_temperature_offset = sx1302_rssi_get_temperature_offset(&CONTEXT_RF_CHAIN[pkt_data[nb_pkt_found].rf_chain].rssi_tcomp, current_temperature);
// 应用RSSI温度补偿
pkt_data[nb_pkt_found].rssic += rssi_temperature_offset;
pkt_data[nb_pkt_found].rssis += rssi_temperature_offset;
// 打印调试信息,显示应用的RSSI温度补偿值和当前温度
DEBUG_PRINTF("INFO: RSSI temperature offset applied: %.3f dB (current temperature %.1f C)\n", rssi_temperature_offset, current_temperature);
}
DEBUG_PRINTF("INFO: nb pkt found:%u left:%u\n", nb_pkt_found, nb_pkt_left);
/* Remove duplicated packets generated by double demod when precision timestamp is enabled */
// 如果启用精确时间戳,则移除由于双重解调而生成的重复数据包
if ((nb_pkt_found > 0) && (CONTEXT_FINE_TIMESTAMP.enable == true)) {
res = merge_packets(pkt_data, &nb_pkt_found); // 调用函数移除重复数据包
if (res != 0) {
printf("WARNING: failed to remove duplicated packets\n");
}
DEBUG_PRINTF("INFO: nb pkt found:%u (after de-duplicating)\n", nb_pkt_found);
}
_meas_time_stop(1, tm, __FUNCTION__); // 停止测量函数执行时间,并记录
DEBUG_PRINTF(" --- %s\n", "OUT"); // 打印调试信息,标记函数执行结束
return nb_pkt_found; // 返回接收到并处理的数据包数量
}
三、下行
1、下行的过程
网关收到服务器的PULL_RESP packet以后。在线程thread_down会将buff_down里面的数据解析出来并放到txpkt结构体中(注意这里不是结构体数组,因为下行链路只有一个),会向服务器发一个TX_ACK包确认收到。消息实体里面的playload会通过thread_down里调用函数b64_to_bin转化成二进制,然后将其放到JIT队列中,JIT队列存在的意义是防止下行消息和网关正常操作的冲突,所以这里面设置了一个锁mx_connect来控制对集中器的访问,集中器空闲后调用lgw_send函数(里面对txpkt进行了编码)通过SPI将服务器下发的元数据(包括用哪个信道发送,发送频率等)写到寄存器中,然后集中器从FIFO和数据缓存区拿playload和元数据并调制发送到节点。
注意,下行的时候需要控制两个线程
thread_down:负责接收服务器下发的下行数据(PULL_RESP),并将数据解析和处理后放入JIT队列中。
thread_jit:负责管理JIT队列,确保按照LoRaWAN协议规定的时隙和时间窗口发送下行数据。
2、thread_down
thread_down
函数是网关中处理下行数据包的线程函数。其主要任务是从服务器接收下行数据包(PULL_RESP packet),将其解析并传递给LoRa集中器进行发送。
工作流程:
1、初始化和进入循环:thread_down
函数通常是一个无限循环,持续运行以不断接收服务器的下行数据包。它会初始化一些变量和网络连接,然后进入循环等待服务器发送的PULL_RESP数据包。
2、接收数据包:在循环中,thread_down
通过套接字接收来自服务器的PULL_RESP数据包。这个过程通常使用阻塞式的recv
函数,等待数据包到达。
3、解析数据包:接收到PULL_RESP数据包后,thread_down
会对数据包进行解析。具体来说,它会解析JSON格式的数据,提取出下行数据的各个字段,例如目标节点地址、发送频率、功率等。
4、填充txpkt结构体:解析完成后,thread_down
将提取的数据存入一个txpkt
结构体中。这个结构体包含了下行消息的所有必要信息,包括要发送的payload数据、发送时间、频率、功率等。
5、发送确认(TX_ACK):为了确认数据包已成功接收和处理,thread_down
会向服务器发送一个TX_ACK数据包。这是一个确认消息,告诉服务器网关已经成功接收并准备发送下行数据包。
6、将数据放入JIT队列:thread_down
会将填充好的txpkt
结构体放入JIT队列中。JIT队列管理所有待发送的下行数据包,确保它们能够在正确的时间发送,以避免冲突和干扰。
7、使用锁控制访问:在将数据放入JIT队列时,thread_down
使用锁(如mx_connect
)来控制对集中器的访问。这确保了在高并发环境下对集中器的操作是安全的,不会出现竞争条件。
8、调用lgw_send函数:当JIT队列中有数据包准备发送且集中器空闲时,thread_down
调用lgw_send
函数。lgw_send
函数负责将txpkt
结构体中的数据通过SPI接口发送到集中器。集中器会从FIFO和数据缓冲区获取这些数据,并进行调制和发送。
void thread_down(void) {
int i; /* loop variables */
/* configuration and metadata for an outbound packet */
struct lgw_pkt_tx_s txpkt;
bool sent_immediate = false; /* option to sent the packet immediately */
/* local timekeeping variables */
struct timespec send_time; /* time of the pull request */
struct timespec recv_time; /* time of return from recv socket call */
/* data buffers */
uint8_t buff_down[1000]; /* buffer to receive downstream packets */
uint8_t buff_req[12]; /* buffer to compose pull requests */
int msg_len;
/* protocol variables */
uint8_t token_h; /* random token for acknowledgement matching */
uint8_t token_l; /* random token for acknowledgement matching */
bool req_ack = false; /* keep track of whether PULL_DATA was acknowledged or not */
/* JSON parsing variables */
JSON_Value *root_val = NULL;
JSON_Object *txpk_obj = NULL;
JSON_Value *val = NULL; /* needed to detect the absence of some fields */
const char *str; /* pointer to sub-strings in the JSON data */
short x0, x1;
uint64_t x2;
double x3, x4;
/* variables to send on GPS timestamp */
struct tref local_ref; /* time reference used for GPS <-> timestamp conversion */
struct timespec gps_tx; /* GPS time that needs to be converted to timestamp */
/* beacon variables */
struct lgw_pkt_tx_s beacon_pkt;
uint8_t beacon_chan;
uint8_t beacon_loop;
size_t beacon_RFU1_size = 0;
size_t beacon_RFU2_size = 0;
uint8_t beacon_pyld_idx = 0;
time_t diff_beacon_time;
struct timespec next_beacon_gps_time; /* gps time of next beacon packet */
struct timespec last_beacon_gps_time; /* gps time of last enqueued beacon packet */
int retry;
/* beacon data fields, byte 0 is Least Significant Byte */
int32_t field_latitude; /* 3 bytes, derived from reference latitude */
int32_t field_longitude; /* 3 bytes, derived from reference longitude */
uint16_t field_crc1, field_crc2;
/* auto-quit variable */
uint32_t autoquit_cnt = 0; /* count the number of PULL_DATA sent since the latest PULL_ACK */
/* Just In Time downlink */
uint32_t current_concentrator_time;
enum jit_error_e jit_result = JIT_ERROR_OK;
enum jit_pkt_type_e downlink_type;
enum jit_error_e warning_result = JIT_ERROR_OK;
int32_t warning_value = 0;
uint8_t tx_lut_idx = 0;
/* set downstream socket RX timeout */
i = setsockopt(sock_down, SOL_SOCKET, SO_RCVTIMEO, (void *)&pull_timeout, sizeof pull_timeout);
if (i != 0) {
MSG("ERROR: [down] setsockopt returned %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
/* pre-fill the pull request buffer with fixed fields */
buff_req[0] = PROTOCOL_VERSION;
buff_req[3] = PKT_PULL_DATA;
*(uint32_t *)(buff_req + 4) = net_mac_h;
*(uint32_t *)(buff_req + 8) = net_mac_l;
/* beacon variables initialization */
last_beacon_gps_time.tv_sec = 0;
last_beacon_gps_time.tv_nsec = 0;
/* beacon packet parameters */
beacon_pkt.tx_mode = ON_GPS; /* send on PPS pulse */
beacon_pkt.rf_chain = 0; /* antenna A */
beacon_pkt.rf_power = beacon_power;
beacon_pkt.modulation = MOD_LORA;
switch (beacon_bw_hz) {
case 125000:
beacon_pkt.bandwidth = BW_125KHZ;
break;
case 500000:
beacon_pkt.bandwidth = BW_500KHZ;
break;
default:
/* should not happen */
MSG("ERROR: unsupported bandwidth for beacon\n");
exit(EXIT_FAILURE);
}
switch (beacon_datarate) {
case 8:
beacon_pkt.datarate = DR_LORA_SF8;
beacon_RFU1_size = 1;
beacon_RFU2_size = 3;
break;
case 9:
beacon_pkt.datarate = DR_LORA_SF9;
beacon_RFU1_size = 2;
beacon_RFU2_size = 0;
break;
case 10:
beacon_pkt.datarate = DR_LORA_SF10;
beacon_RFU1_size = 3;
beacon_RFU2_size = 1;
break;
case 12:
beacon_pkt.datarate = DR_LORA_SF12;
beacon_RFU1_size = 5;
beacon_RFU2_size = 3;
break;
default:
/* should not happen */
MSG("ERROR: unsupported datarate for beacon\n");
exit(EXIT_FAILURE);
}
beacon_pkt.size = beacon_RFU1_size + 4 + 2 + 7 + beacon_RFU2_size + 2;
beacon_pkt.coderate = CR_LORA_4_5;
beacon_pkt.invert_pol = false;
beacon_pkt.preamble = 10;
beacon_pkt.no_crc = true;
beacon_pkt.no_header = true;
/* network common part beacon fields (little endian) */
for (i = 0; i < (int)beacon_RFU1_size; i++) {
beacon_pkt.payload[beacon_pyld_idx++] = 0x0;
}
/* network common part beacon fields (little endian) */
beacon_pyld_idx += 4; /* time (variable), filled later */
beacon_pyld_idx += 2; /* crc1 (variable), filled later */
/* calculate the latitude and longitude that must be publicly reported */
field_latitude = (int32_t)((reference_coord.lat / 90.0) * (double)(1<<23));
if (field_latitude > (int32_t)0x007FFFFF) {
field_latitude = (int32_t)0x007FFFFF; /* +90 N is represented as 89.99999 N */
} else if (field_latitude < (int32_t)0xFF800000) {
field_latitude = (int32_t)0xFF800000;
}
field_longitude = (int32_t)((reference_coord.lon / 180.0) * (double)(1<<23));
if (field_longitude > (int32_t)0x007FFFFF) {
field_longitude = (int32_t)0x007FFFFF; /* +180 E is represented as 179.99999 E */
} else if (field_longitude < (int32_t)0xFF800000) {
field_longitude = (int32_t)0xFF800000;
}
/* gateway specific beacon fields */
beacon_pkt.payload[beacon_pyld_idx++] = beacon_infodesc;
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & field_latitude;
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & (field_latitude >> 8);
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & (field_latitude >> 16);
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & field_longitude;
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & (field_longitude >> 8);
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & (field_longitude >> 16);
/* RFU */
for (i = 0; i < (int)beacon_RFU2_size; i++) {
beacon_pkt.payload[beacon_pyld_idx++] = 0x0;
}
/* CRC of the beacon gateway specific part fields */
field_crc2 = crc16((beacon_pkt.payload + 6 + beacon_RFU1_size), 7 + beacon_RFU2_size);
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & field_crc2;
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & (field_crc2 >> 8);
/* JIT queue initialization */
jit_queue_init(&jit_queue[0]);
jit_queue_init(&jit_queue[1]);
while (!exit_sig && !quit_sig) {
/* auto-quit if the threshold is crossed */
if ((autoquit_threshold > 0) && (autoquit_cnt >= autoquit_threshold)) {
exit_sig = true;
MSG("INFO: [down] the last %u PULL_DATA were not ACKed, exiting application\n", autoquit_threshold);
break;
}
/* generate random token for request */
token_h = (uint8_t)rand(); /* random token */
token_l = (uint8_t)rand(); /* random token */
buff_req[1] = token_h;
buff_req[2] = token_l;
/* send PULL request and record time */
send(sock_down, (void *)buff_req, sizeof buff_req, 0);
clock_gettime(CLOCK_MONOTONIC, &send_time);
pthread_mutex_lock(&mx_meas_dw);
meas_dw_pull_sent += 1;
pthread_mutex_unlock(&mx_meas_dw);
req_ack = false;
autoquit_cnt++;
/* listen to packets and process them until a new PULL request must be sent */
recv_time = send_time;
while (((int)difftimespec(recv_time, send_time) < keepalive_time) && !exit_sig && !quit_sig) {
/* try to receive a datagram */
msg_len = recv(sock_down, (void *)buff_down, (sizeof buff_down)-1, 0);
clock_gettime(CLOCK_MONOTONIC, &recv_time);
/* Pre-allocate beacon slots in JiT queue, to check downlink collisions */
beacon_loop = JIT_NUM_BEACON_IN_QUEUE - jit_queue[0].num_beacon;
retry = 0;
while (beacon_loop && (beacon_period != 0)) {
pthread_mutex_lock(&mx_timeref);
/* Wait for GPS to be ready before inserting beacons in JiT queue */
if ((gps_ref_valid == true) && (xtal_correct_ok == true)) {
/* compute GPS time for next beacon to come */
/* LoRaWAN: T = k*beacon_period + TBeaconDelay */
/* with TBeaconDelay = [1.5ms +/- 1µs]*/
if (last_beacon_gps_time.tv_sec == 0) {
/* if no beacon has been queued, get next slot from current GPS time */
diff_beacon_time = time_reference_gps.gps.tv_sec % ((time_t)beacon_period);
next_beacon_gps_time.tv_sec = time_reference_gps.gps.tv_sec +
((time_t)beacon_period - diff_beacon_time);
} else {
/* if there is already a beacon, take it as reference */
next_beacon_gps_time.tv_sec = last_beacon_gps_time.tv_sec + beacon_period;
}
/* now we can add a beacon_period to the reference to get next beacon GPS time */
next_beacon_gps_time.tv_sec += (retry * beacon_period);
next_beacon_gps_time.tv_nsec = 0;
#if DEBUG_BEACON
{
time_t time_unix;
time_unix = time_reference_gps.gps.tv_sec + UNIX_GPS_EPOCH_OFFSET;
MSG_DEBUG(DEBUG_BEACON, "GPS-now : %s", ctime(&time_unix));
time_unix = last_beacon_gps_time.tv_sec + UNIX_GPS_EPOCH_OFFSET;
MSG_DEBUG(DEBUG_BEACON, "GPS-last: %s", ctime(&time_unix));
time_unix = next_beacon_gps_time.tv_sec + UNIX_GPS_EPOCH_OFFSET;
MSG_DEBUG(DEBUG_BEACON, "GPS-next: %s", ctime(&time_unix));
}
#endif
/* convert GPS time to concentrator time, and set packet counter for JiT trigger */
lgw_gps2cnt(time_reference_gps, next_beacon_gps_time, &(beacon_pkt.count_us));
pthread_mutex_unlock(&mx_timeref);
/* apply frequency correction to beacon TX frequency */
if (beacon_freq_nb > 1) {
beacon_chan = (next_beacon_gps_time.tv_sec / beacon_period) % beacon_freq_nb; /* floor rounding */
} else {
beacon_chan = 0;
}
/* Compute beacon frequency */
beacon_pkt.freq_hz = beacon_freq_hz + (beacon_chan * beacon_freq_step);
/* load time in beacon payload */
beacon_pyld_idx = beacon_RFU1_size;
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & next_beacon_gps_time.tv_sec;
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & (next_beacon_gps_time.tv_sec >> 8);
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & (next_beacon_gps_time.tv_sec >> 16);
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & (next_beacon_gps_time.tv_sec >> 24);
/* calculate CRC */
field_crc1 = crc16(beacon_pkt.payload, 4 + beacon_RFU1_size); /* CRC for the network common part */
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & field_crc1;
beacon_pkt.payload[beacon_pyld_idx++] = 0xFF & (field_crc1 >> 8);
/* Insert beacon packet in JiT queue */
pthread_mutex_lock(&mx_concent);
lgw_get_instcnt(¤t_concentrator_time);
pthread_mutex_unlock(&mx_concent);
jit_result = jit_enqueue(&jit_queue[0], current_concentrator_time, &beacon_pkt, JIT_PKT_TYPE_BEACON);
if (jit_result == JIT_ERROR_OK) {
/* update stats */
pthread_mutex_lock(&mx_meas_dw);
meas_nb_beacon_queued += 1;
pthread_mutex_unlock(&mx_meas_dw);
/* One more beacon in the queue */
beacon_loop--;
retry = 0;
last_beacon_gps_time.tv_sec = next_beacon_gps_time.tv_sec; /* keep this beacon time as reference for next one to be programmed */
/* display beacon payload */
MSG("INFO: Beacon queued (count_us=%u, freq_hz=%u, size=%u):\n", beacon_pkt.count_us, beacon_pkt.freq_hz, beacon_pkt.size);
printf( " => " );
for (i = 0; i < beacon_pkt.size; ++i) {
MSG("%02X ", beacon_pkt.payload[i]);
}
MSG("\n");
} else {
MSG_DEBUG(DEBUG_BEACON, "--> beacon queuing failed with %d\n", jit_result);
/* update stats */
pthread_mutex_lock(&mx_meas_dw);
if (jit_result != JIT_ERROR_COLLISION_BEACON) {
meas_nb_beacon_rejected += 1;
}
pthread_mutex_unlock(&mx_meas_dw);
/* In case previous enqueue failed, we retry one period later until it succeeds */
/* Note: In case the GPS has been unlocked for a while, there can be lots of retries */
/* to be done from last beacon time to a new valid one */
retry++;
MSG_DEBUG(DEBUG_BEACON, "--> beacon queuing retry=%d\n", retry);
}
} else {
pthread_mutex_unlock(&mx_timeref);
break;
}
}
/* if no network message was received, got back to listening sock_down socket */
if (msg_len == -1) {
//MSG("WARNING: [down] recv returned %s\n", strerror(errno)); /* too verbose */
continue;
}
/* if the datagram does not respect protocol, just ignore it */
if ((msg_len < 4) || (buff_down[0] != PROTOCOL_VERSION) || ((buff_down[3] != PKT_PULL_RESP) && (buff_down[3] != PKT_PULL_ACK))) {
MSG("WARNING: [down] ignoring invalid packet len=%d, protocol_version=%d, id=%d\n",
msg_len, buff_down[0], buff_down[3]);
continue;
}
/* if the datagram is an ACK, check token */
if (buff_down[3] == PKT_PULL_ACK) {
if ((buff_down[1] == token_h) && (buff_down[2] == token_l)) {
if (req_ack) {
MSG("INFO: [down] duplicate ACK received :)\n");
} else { /* if that packet was not already acknowledged */
req_ack = true;
autoquit_cnt = 0;
pthread_mutex_lock(&mx_meas_dw);
meas_dw_ack_rcv += 1;
pthread_mutex_unlock(&mx_meas_dw);
MSG("INFO: [down] PULL_ACK received in %i ms\n", (int)(1000 * difftimespec(recv_time, send_time)));
}
} else { /* out-of-sync token */
MSG("INFO: [down] received out-of-sync ACK\n");
}
continue;
}
/* the datagram is a PULL_RESP */
buff_down[msg_len] = 0; /* add string terminator, just to be safe */
MSG("INFO: [down] PULL_RESP received - token[%d:%d] :)\n", buff_down[1], buff_down[2]); /* very verbose */
printf("\nJSON down: %s\n", (char *)(buff_down + 4)); /* DEBUG: display JSON payload */
/* initialize TX struct and try to parse JSON */
memset(&txpkt, 0, sizeof txpkt);
root_val = json_parse_string_with_comments((const char *)(buff_down + 4)); /* JSON offset */
if (root_val == NULL) {
MSG("WARNING: [down] invalid JSON, TX aborted\n");
continue;
}
/* look for JSON sub-object 'txpk' */
txpk_obj = json_object_get_object(json_value_get_object(root_val), "txpk");
if (txpk_obj == NULL) {
MSG("WARNING: [down] no \"txpk\" object in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
/* Parse "immediate" tag, or target timestamp, or UTC time to be converted by GPS (mandatory) */
i = json_object_get_boolean(txpk_obj,"imme"); /* can be 1 if true, 0 if false, or -1 if not a JSON boolean */
if (i == 1) {
/* TX procedure: send immediately */
sent_immediate = true;
downlink_type = JIT_PKT_TYPE_DOWNLINK_CLASS_C;
MSG("INFO: [down] a packet will be sent in \"immediate\" mode\n");
} else {
sent_immediate = false;
val = json_object_get_value(txpk_obj,"tmst");
if (val != NULL) {
/* TX procedure: send on timestamp value */
txpkt.count_us = (uint32_t)json_value_get_number(val);
/* Concentrator timestamp is given, we consider it is a Class A downlink */
downlink_type = JIT_PKT_TYPE_DOWNLINK_CLASS_A;
} else {
/* TX procedure: send on GPS time (converted to timestamp value) */
val = json_object_get_value(txpk_obj, "tmms");
if (val == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.tmst\" or \"txpk.tmms\" objects in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
if (gps_enabled == true) {
pthread_mutex_lock(&mx_timeref);
if (gps_ref_valid == true) {
local_ref = time_reference_gps;
pthread_mutex_unlock(&mx_timeref);
} else {
pthread_mutex_unlock(&mx_timeref);
MSG("WARNING: [down] no valid GPS time reference yet, impossible to send packet on specific GPS time, TX aborted\n");
json_value_free(root_val);
/* send acknoledge datagram to server */
send_tx_ack(buff_down[1], buff_down[2], JIT_ERROR_GPS_UNLOCKED, 0);
continue;
}
} else {
MSG("WARNING: [down] GPS disabled, impossible to send packet on specific GPS time, TX aborted\n");
json_value_free(root_val);
/* send acknoledge datagram to server */
send_tx_ack(buff_down[1], buff_down[2], JIT_ERROR_GPS_UNLOCKED, 0);
continue;
}
/* Get GPS time from JSON */
x2 = (uint64_t)json_value_get_number(val);
/* Convert GPS time from milliseconds to timespec */
x3 = modf((double)x2/1E3, &x4);
gps_tx.tv_sec = (time_t)x4; /* get seconds from integer part */
gps_tx.tv_nsec = (long)(x3 * 1E9); /* get nanoseconds from fractional part */
/* transform GPS time to timestamp */
i = lgw_gps2cnt(local_ref, gps_tx, &(txpkt.count_us));
if (i != LGW_GPS_SUCCESS) {
MSG("WARNING: [down] could not convert GPS time to timestamp, TX aborted\n");
json_value_free(root_val);
continue;
} else {
MSG("INFO: [down] a packet will be sent on timestamp value %u (calculated from GPS time)\n", txpkt.count_us);
}
/* GPS timestamp is given, we consider it is a Class B downlink */
downlink_type = JIT_PKT_TYPE_DOWNLINK_CLASS_B;
}
}
/* Parse "No CRC" flag (optional field) */
val = json_object_get_value(txpk_obj,"ncrc");
if (val != NULL) {
txpkt.no_crc = (bool)json_value_get_boolean(val);
}
/* Parse "No header" flag (optional field) */
val = json_object_get_value(txpk_obj,"nhdr");
if (val != NULL) {
txpkt.no_header = (bool)json_value_get_boolean(val);
}
/* parse target frequency (mandatory) */
val = json_object_get_value(txpk_obj,"freq");
if (val == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.freq\" object in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
txpkt.freq_hz = (uint32_t)((double)(1.0e6) * json_value_get_number(val));
/* parse RF chain used for TX (mandatory) */
val = json_object_get_value(txpk_obj,"rfch");
if (val == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.rfch\" object in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
txpkt.rf_chain = (uint8_t)json_value_get_number(val);
if (tx_enable[txpkt.rf_chain] == false) {
MSG("WARNING: [down] TX is not enabled on RF chain %u, TX aborted\n", txpkt.rf_chain);
json_value_free(root_val);
continue;
}
/* parse TX power (optional field) */
val = json_object_get_value(txpk_obj,"powe");
if (val != NULL) {
txpkt.rf_power = (int8_t)json_value_get_number(val) - antenna_gain;
}
/* Parse modulation (mandatory) */
str = json_object_get_string(txpk_obj, "modu");
if (str == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.modu\" object in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
if (strcmp(str, "LORA") == 0) {
/* Lora modulation */
txpkt.modulation = MOD_LORA;
/* Parse Lora spreading-factor and modulation bandwidth (mandatory) */
str = json_object_get_string(txpk_obj, "datr");
if (str == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.datr\" object in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
i = sscanf(str, "SF%2hdBW%3hd", &x0, &x1);
if (i != 2) {
MSG("WARNING: [down] format error in \"txpk.datr\", TX aborted\n");
json_value_free(root_val);
continue;
}
switch (x0) {
case 5: txpkt.datarate = DR_LORA_SF5; break;
case 6: txpkt.datarate = DR_LORA_SF6; break;
case 7: txpkt.datarate = DR_LORA_SF7; break;
case 8: txpkt.datarate = DR_LORA_SF8; break;
case 9: txpkt.datarate = DR_LORA_SF9; break;
case 10: txpkt.datarate = DR_LORA_SF10; break;
case 11: txpkt.datarate = DR_LORA_SF11; break;
case 12: txpkt.datarate = DR_LORA_SF12; break;
default:
MSG("WARNING: [down] format error in \"txpk.datr\", invalid SF, TX aborted\n");
json_value_free(root_val);
continue;
}
switch (x1) {
case 125: txpkt.bandwidth = BW_125KHZ; break;
case 250: txpkt.bandwidth = BW_250KHZ; break;
case 500: txpkt.bandwidth = BW_500KHZ; break;
default:
MSG("WARNING: [down] format error in \"txpk.datr\", invalid BW, TX aborted\n");
json_value_free(root_val);
continue;
}
/* Parse ECC coding rate (optional field) */
str = json_object_get_string(txpk_obj, "codr");
if (str == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.codr\" object in json, TX aborted\n");
json_value_free(root_val);
continue;
}
if (strcmp(str, "4/5") == 0) txpkt.coderate = CR_LORA_4_5;
else if (strcmp(str, "4/6") == 0) txpkt.coderate = CR_LORA_4_6;
else if (strcmp(str, "2/3") == 0) txpkt.coderate = CR_LORA_4_6;
else if (strcmp(str, "4/7") == 0) txpkt.coderate = CR_LORA_4_7;
else if (strcmp(str, "4/8") == 0) txpkt.coderate = CR_LORA_4_8;
else if (strcmp(str, "1/2") == 0) txpkt.coderate = CR_LORA_4_8;
else {
MSG("WARNING: [down] format error in \"txpk.codr\", TX aborted\n");
json_value_free(root_val);
continue;
}
/* Parse signal polarity switch (optional field) */
val = json_object_get_value(txpk_obj,"ipol");
if (val != NULL) {
txpkt.invert_pol = (bool)json_value_get_boolean(val);
}
/* parse Lora preamble length (optional field, optimum min value enforced) */
val = json_object_get_value(txpk_obj,"prea");
if (val != NULL) {
i = (int)json_value_get_number(val);
if (i >= MIN_LORA_PREAMB) {
txpkt.preamble = (uint16_t)i;
} else {
txpkt.preamble = (uint16_t)MIN_LORA_PREAMB;
}
} else {
txpkt.preamble = (uint16_t)STD_LORA_PREAMB;
}
} else if (strcmp(str, "FSK") == 0) {
/* FSK modulation */
txpkt.modulation = MOD_FSK;
/* parse FSK bitrate (mandatory) */
val = json_object_get_value(txpk_obj,"datr");
if (val == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.datr\" object in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
txpkt.datarate = (uint32_t)(json_value_get_number(val));
/* parse frequency deviation (mandatory) */
val = json_object_get_value(txpk_obj,"fdev");
if (val == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.fdev\" object in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
txpkt.f_dev = (uint8_t)(json_value_get_number(val) / 1000.0); /* JSON value in Hz, txpkt.f_dev in kHz */
/* parse FSK preamble length (optional field, optimum min value enforced) */
val = json_object_get_value(txpk_obj,"prea");
if (val != NULL) {
i = (int)json_value_get_number(val);
if (i >= MIN_FSK_PREAMB) {
txpkt.preamble = (uint16_t)i;
} else {
txpkt.preamble = (uint16_t)MIN_FSK_PREAMB;
}
} else {
txpkt.preamble = (uint16_t)STD_FSK_PREAMB;
}
} else {
MSG("WARNING: [down] invalid modulation in \"txpk.modu\", TX aborted\n");
json_value_free(root_val);
continue;
}
/* Parse payload length (mandatory) */
val = json_object_get_value(txpk_obj,"size");
if (val == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.size\" object in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
txpkt.size = (uint16_t)json_value_get_number(val);
/* Parse payload data (mandatory) */
str = json_object_get_string(txpk_obj, "data");
if (str == NULL) {
MSG("WARNING: [down] no mandatory \"txpk.data\" object in JSON, TX aborted\n");
json_value_free(root_val);
continue;
}
i = b64_to_bin(str, strlen(str), txpkt.payload, sizeof txpkt.payload);
if (i != txpkt.size) {
MSG("WARNING: [down] mismatch between .size and .data size once converter to binary\n");
}
/* free the JSON parse tree from memory */
json_value_free(root_val);
/* select TX mode */
if (sent_immediate) {
txpkt.tx_mode = IMMEDIATE;
} else {
txpkt.tx_mode = TIMESTAMPED;
}
/* record measurement data */
pthread_mutex_lock(&mx_meas_dw);
meas_dw_dgram_rcv += 1; /* count only datagrams with no JSON errors */
meas_dw_network_byte += msg_len; /* meas_dw_network_byte */
meas_dw_payload_byte += txpkt.size;
pthread_mutex_unlock(&mx_meas_dw);
/* reset error/warning results */
jit_result = warning_result = JIT_ERROR_OK;
warning_value = 0;
/* check TX frequency before trying to queue packet */
if ((txpkt.freq_hz < tx_freq_min[txpkt.rf_chain]) || (txpkt.freq_hz > tx_freq_max[txpkt.rf_chain])) {
jit_result = JIT_ERROR_TX_FREQ;
MSG("ERROR: Packet REJECTED, unsupported frequency - %u (min:%u,max:%u)\n", txpkt.freq_hz, tx_freq_min[txpkt.rf_chain], tx_freq_max[txpkt.rf_chain]);
}
/* check TX power before trying to queue packet, send a warning if not supported */
if (jit_result == JIT_ERROR_OK) {
i = get_tx_gain_lut_index(txpkt.rf_chain, txpkt.rf_power, &tx_lut_idx);
if ((i < 0) || (txlut[txpkt.rf_chain].lut[tx_lut_idx].rf_power != txpkt.rf_power)) {
/* this RF power is not supported, throw a warning, and use the closest lower power supported */
warning_result = JIT_ERROR_TX_POWER;
warning_value = (int32_t)txlut[txpkt.rf_chain].lut[tx_lut_idx].rf_power;
printf("WARNING: Requested TX power is not supported (%ddBm), actual power used: %ddBm\n", txpkt.rf_power, warning_value);
txpkt.rf_power = txlut[txpkt.rf_chain].lut[tx_lut_idx].rf_power;
}
}
/* insert packet to be sent into JIT queue */
if (jit_result == JIT_ERROR_OK) {
pthread_mutex_lock(&mx_concent);
lgw_get_instcnt(¤t_concentrator_time);
pthread_mutex_unlock(&mx_concent);
jit_result = jit_enqueue(&jit_queue[txpkt.rf_chain], current_concentrator_time, &txpkt, downlink_type);
if (jit_result != JIT_ERROR_OK) {
printf("ERROR: Packet REJECTED (jit error=%d)\n", jit_result);
} else {
/* In case of a warning having been raised before, we notify it */
jit_result = warning_result;
}
pthread_mutex_lock(&mx_meas_dw);
meas_nb_tx_requested += 1;
pthread_mutex_unlock(&mx_meas_dw);
}
/* Send acknoledge datagram to server */
send_tx_ack(buff_down[1], buff_down[2], jit_result, warning_value);
}
}
MSG("\nINFO: End of downstream thread\n");
}
3、thread_jit
thread_jit
函数是LoRa网关中管理JIT(Just-In-Time)队列的线程函数。它的主要任务是从JIT队列中提取待发送的数据包,并在适当的时间调用发送函数(如lgw_send
)将数据包通过LoRa集中器发送出去。
工作流程:
1、初始化和进入循环:thread_jit
函数通常也是一个无限循环,持续运行以不断检查和处理JIT队列中的数据包。它会初始化一些必要的变量,然后进入循环。
2、获取当前时间:在循环中,thread_jit
会定期获取当前的系统时间。这是为了确保可以正确地调度数据包的发送时间。
3、检查JIT队列:thread_jit
会检查JIT队列中是否有待发送的数据包。如果队列为空,它会休眠一段时间,然后继续循环。
4、提取并调度数据包:如果JIT队列中有数据包,thread_jit
会提取出最早需要发送的数据包,并检查该数据包的发送时间是否已到。
- 如果发送时间未到,它会计算需要等待的时间,并在此期间休眠。
- 如果发送时间已到,它会立即准备发送数据包。
5、发送数据包:当数据包的发送时间到达时,thread_jit
会调用lgw_send
函数,将数据包发送到LoRa集中器。
6、处理发送结果:thread_jit
会检查lgw_send
函数的返回值,以确定发送是否成功。如果发送失败,它可能会进行适当的错误处理,如记录日志或重试发送。
7、循环继续:发送完成后,thread_jit
会继续检查JIT队列,处理下一个数据包。
void thread_jit(void) {
int result = LGW_HAL_SUCCESS;
struct lgw_pkt_tx_s pkt;
int pkt_index = -1;
uint32_t current_concentrator_time;
enum jit_error_e jit_result;
enum jit_pkt_type_e pkt_type;
uint8_t tx_status;
int i;
while (!exit_sig && !quit_sig) {
wait_ms(10);
for (i = 0; i < LGW_RF_CHAIN_NB; i++) {
/* transfer data and metadata to the concentrator, and schedule TX */
pthread_mutex_lock(&mx_concent);
lgw_get_instcnt(¤t_concentrator_time);
pthread_mutex_unlock(&mx_concent);
jit_result = jit_peek(&jit_queue[i], current_concentrator_time, &pkt_index);
if (jit_result == JIT_ERROR_OK) {
if (pkt_index > -1) {
jit_result = jit_dequeue(&jit_queue[i], pkt_index, &pkt, &pkt_type);
if (jit_result == JIT_ERROR_OK) {
/* update beacon stats */
if (pkt_type == JIT_PKT_TYPE_BEACON) {
/* Compensate breacon frequency with xtal error */
pthread_mutex_lock(&mx_xcorr);
pkt.freq_hz = (uint32_t)(xtal_correct * (double)pkt.freq_hz);
MSG_DEBUG(DEBUG_BEACON, "beacon_pkt.freq_hz=%u (xtal_correct=%.15lf)\n", pkt.freq_hz, xtal_correct);
pthread_mutex_unlock(&mx_xcorr);
/* Update statistics */
pthread_mutex_lock(&mx_meas_dw);
meas_nb_beacon_sent += 1;
pthread_mutex_unlock(&mx_meas_dw);
MSG("INFO: Beacon dequeued (count_us=%u)\n", pkt.count_us);
}
/* check if concentrator is free for sending new packet */
pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
result = lgw_status(pkt.rf_chain, TX_STATUS, &tx_status);
pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
if (result == LGW_HAL_ERROR) {
MSG("WARNING: [jit%d] lgw_status failed\n", i);
} else {
if (tx_status == TX_EMITTING) {
MSG("ERROR: concentrator is currently emitting on rf_chain %d\n", i);
print_tx_status(tx_status);
continue;
} else if (tx_status == TX_SCHEDULED) {
MSG("WARNING: a downlink was already scheduled on rf_chain %d, overwritting it...\n", i);
print_tx_status(tx_status);
} else {
/* Nothing to do */
}
}
/* send packet to concentrator */
pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
if (spectral_scan_params.enable == true) {
result = lgw_spectral_scan_abort();
if (result != LGW_HAL_SUCCESS) {
MSG("WARNING: [jit%d] lgw_spectral_scan_abort failed\n", i);
}
}
result = lgw_send(&pkt);
pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
if (result != LGW_HAL_SUCCESS) {
pthread_mutex_lock(&mx_meas_dw);
meas_nb_tx_fail += 1;
pthread_mutex_unlock(&mx_meas_dw);
MSG("WARNING: [jit] lgw_send failed on rf_chain %d\n", i);
continue;
} else {
pthread_mutex_lock(&mx_meas_dw);
meas_nb_tx_ok += 1;
pthread_mutex_unlock(&mx_meas_dw);
MSG_DEBUG(DEBUG_PKT_FWD, "lgw_send done on rf_chain %d: count_us=%u\n", i, pkt.count_us);
}
} else {
MSG("ERROR: jit_dequeue failed on rf_chain %d with %d\n", i, jit_result);
}
}
} else if (jit_result == JIT_ERROR_EMPTY) {
/* Do nothing, it can happen */
} else {
MSG("ERROR: jit_peek failed on rf_chain %d with %d\n", i, jit_result);
}
}
}
MSG("\nINFO: End of JIT thread\n");
}
4、lgw_send函数
int lgw_send(struct lgw_pkt_tx_s *txpkt);
通常,lgw_send
函数会接受一个结构体参数,该结构体包含了需要发送的数据包及其相关配置信息。而txpkt是
指向待发送数据包结构体的指针,包含了要发送的实际数据和元数据(例如,频率、数据速率、功率等)。
lgw_send
函数的主要目的是通过SPI接口将下行数据包及其元数据配置写入LoRa集中器,并触发LoRa集中器开始发送数据包。
工作流程:
1、参数验证:检查输入参数是否有效,例如检查指针是否为空、数据包长度是否合法等。
2、SPI接口初始化:确保SPI接口已正确初始化,并可以与LoRa集中器进行通信。
3、寄存器配置:根据txpkt
结构体中的信息,配置LoRa集中器的寄存器。包括设置频率、功率、数据速率、编码率等参数。
4、数据写入:将实际的数据(payload)写入LoRa集中器的FIFO缓冲区。
5、启动发送:设置寄存器以启动数据包发送过程。
6、错误处理和返回值:检查每一步的执行情况,处理可能的错误,并返回相应的状态码。
void thread_jit(void) {
int result = LGW_HAL_SUCCESS;
struct lgw_pkt_tx_s pkt;
int pkt_index = -1;
uint32_t current_concentrator_time;
enum jit_error_e jit_result;
enum jit_pkt_type_e pkt_type;
uint8_t tx_status;
int i;
while (!exit_sig && !quit_sig) {
wait_ms(10);
for (i = 0; i < LGW_RF_CHAIN_NB; i++) {
/* transfer data and metadata to the concentrator, and schedule TX */
pthread_mutex_lock(&mx_concent);
lgw_get_instcnt(¤t_concentrator_time);
pthread_mutex_unlock(&mx_concent);
jit_result = jit_peek(&jit_queue[i], current_concentrator_time, &pkt_index);
if (jit_result == JIT_ERROR_OK) {
if (pkt_index > -1) {
jit_result = jit_dequeue(&jit_queue[i], pkt_index, &pkt, &pkt_type);
if (jit_result == JIT_ERROR_OK) {
/* update beacon stats */
if (pkt_type == JIT_PKT_TYPE_BEACON) {
/* Compensate breacon frequency with xtal error */
pthread_mutex_lock(&mx_xcorr);
pkt.freq_hz = (uint32_t)(xtal_correct * (double)pkt.freq_hz);
MSG_DEBUG(DEBUG_BEACON, "beacon_pkt.freq_hz=%u (xtal_correct=%.15lf)\n", pkt.freq_hz, xtal_correct);
pthread_mutex_unlock(&mx_xcorr);
/* Update statistics */
pthread_mutex_lock(&mx_meas_dw);
meas_nb_beacon_sent += 1;
pthread_mutex_unlock(&mx_meas_dw);
MSG("INFO: Beacon dequeued (count_us=%u)\n", pkt.count_us);
}
/* check if concentrator is free for sending new packet */
pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
result = lgw_status(pkt.rf_chain, TX_STATUS, &tx_status);
pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
if (result == LGW_HAL_ERROR) {
MSG("WARNING: [jit%d] lgw_status failed\n", i);
} else {
if (tx_status == TX_EMITTING) {
MSG("ERROR: concentrator is currently emitting on rf_chain %d\n", i);
print_tx_status(tx_status);
continue;
} else if (tx_status == TX_SCHEDULED) {
MSG("WARNING: a downlink was already scheduled on rf_chain %d, overwritting it...\n", i);
print_tx_status(tx_status);
} else {
/* Nothing to do */
}
}
/* send packet to concentrator */
pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
if (spectral_scan_params.enable == true) {
result = lgw_spectral_scan_abort();
if (result != LGW_HAL_SUCCESS) {
MSG("WARNING: [jit%d] lgw_spectral_scan_abort failed\n", i);
}
}
result = lgw_send(&pkt);
pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
if (result != LGW_HAL_SUCCESS) {
pthread_mutex_lock(&mx_meas_dw);
meas_nb_tx_fail += 1;
pthread_mutex_unlock(&mx_meas_dw);
MSG("WARNING: [jit] lgw_send failed on rf_chain %d\n", i);
continue;
} else {
pthread_mutex_lock(&mx_meas_dw);
meas_nb_tx_ok += 1;
pthread_mutex_unlock(&mx_meas_dw);
MSG_DEBUG(DEBUG_PKT_FWD, "lgw_send done on rf_chain %d: count_us=%u\n", i, pkt.count_us);
}
} else {
MSG("ERROR: jit_dequeue failed on rf_chain %d with %d\n", i, jit_result);
}
}
} else if (jit_result == JIT_ERROR_EMPTY) {
/* Do nothing, it can happen */
} else {
MSG("ERROR: jit_peek failed on rf_chain %d with %d\n", i, jit_result);
}
}
}
MSG("\nINFO: End of JIT thread\n");
}