AWS Kinesis, AWS S3를 이용한 데이터 수집
1. 서론
AWS IoT 시스템에서 수집된 데이터는 다양한 분석 및 시각화 목적으로 저장되고 처리될 수 있습니다. 이를 위해 AWS는 Amazon Kinesis와 Amazon S3를 활용할 수 있도록 지원합니다. 이 글에서는 AWS IoT 디바이스에서 수집된 데이터를 AWS Kinesis를 통해 AWS S3로 저장하는 방법을 설명합니다.
본 예제에서는 ESP32를 IoT 디바이스로 사용하며, ESP-IDF와 AWS IoT Device SDK for Embedded C를 활용하여 데이터를 AWS IoT Core로 전송합니다. 서버 측에서는 Python을 이용하여 AWS Kinesis와 AWS S3를 설정하고 데이터를 수집하는 과정을 설명하겠습니다.
2. AWS Kinesis 및 S3 개요
2.1 AWS Kinesis
AWS Kinesis는 실시간 스트리밍 데이터 처리 서비스로, 대량의 데이터를 실시간으로 수집하고 저장할 수 있습니다. Kinesis Streams를 활용하면 IoT 디바이스에서 전송된 데이터를 실시간으로 분석하거나 AWS S3 등의 저장소로 보낼 수 있습니다.
2.2 AWS S3
Amazon S3(Simple Storage Service)는 확장 가능한 객체 스토리지로, 데이터를 안전하게 저장하고 분석할 수 있도록 지원합니다. Kinesis를 통해 수집된 IoT 데이터를 S3에 저장하면 장기 보관 및 추가 분석이 가능합니다.
3. AWS 환경 설정
AWS Kinesis와 S3를 이용하여 IoT 데이터를 수집하는 과정을 설정하겠습니다.
3.1 AWS Kinesis 스트림 생성
- AWS 콘솔에서 Kinesis 서비스로 이동합니다.
- Create data stream 버튼을 클릭합니다.
- 스트림 이름을 설정합니다. (예:
IoTDataStream
) - 샤드(Shard) 개수를 선택합니다. (기본값: 1)
- 스트림을 생성한 후, ARN 값을 저장해 둡니다.
3.2 AWS S3 버킷 생성
- AWS 콘솔에서 S3 서비스로 이동합니다.
- Create bucket을 클릭합니다.
- 버킷 이름을 설정합니다. (예:
iot-data-bucket
) - 퍼블릭 액세스를 비활성화하고, 버킷을 생성합니다.
3.3 AWS IoT Rule을 이용한 Kinesis 데이터 전송
AWS IoT Core에서 IoT 디바이스에서 수집된 데이터를 Kinesis로 전송하는 Rule을 설정해야 합니다.
- AWS IoT 콘솔에서 Act 메뉴로 이동합니다.
- Create a rule을 선택합니다.
- SQL 쿼리를 작성하여 특정 IoT 데이터를 필터링합니다.
SELECT temperature, humidity, timestamp FROM 'iot/sensor/data'
- Kinesis를 선택하고, 앞서 생성한
IoTDataStream
을 연결합니다. - IAM Role을 생성하여 AWS IoT가 Kinesis에 데이터를 전송할 수 있도록 설정합니다.
- Rule을 저장합니다.
4. ESP32 IoT 디바이스 코드 (AWS IoT Device SDK for Embedded C)
ESP32에서 온도 데이터를 수집하고 MQTT를 통해 AWS IoT Core로 전송하는 코드를 작성합니다.
#include <stdio.h>
#include "aws_iot_mqtt_client_interface.h"
#include "esp_log.h"
#define AWS_IOT_MQTT_TOPIC "iot/sensor/data"
#define AWS_IOT_MQTT_QOS 0
void publish_temperature_data(AWS_IoT_Client *client) {
char payload[100];
float temperature = 25.5; // 예제 온도 데이터
snprintf(payload, sizeof(payload), "{\"temperature\":%.2f, \"timestamp\":%lu}", temperature, (unsigned long)time(NULL));
IoT_Publish_Message_Params params;
params.qos = AWS_IOT_MQTT_QOS;
params.payload = (void *)payload;
params.payloadLen = strlen(payload);
aws_iot_mqtt_publish(client, AWS_IOT_MQTT_TOPIC, strlen(AWS_IOT_MQTT_TOPIC), ¶ms);
ESP_LOGI("AWS", "Published: %s", payload);
}
5. AWS Kinesis에서 데이터를 AWS S3로 전송 (Python 코드)
AWS Kinesis에서 데이터를 가져와 AWS S3에 저장하는 Python 코드를 작성합니다.
import boto3
import json
from datetime import datetime
# AWS 클라이언트 설정
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
s3_client = boto3.client('s3')
STREAM_NAME = 'IoTDataStream'
S3_BUCKET = 'iot-data-bucket'
# Kinesis에서 데이터 수신
response = kinesis_client.get_shard_iterator(StreamName=STREAM_NAME, ShardId='shardId-000000000000', ShardIteratorType='LATEST')
shard_iterator = response['ShardIterator']
records_response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=10)
# 데이터를 S3에 저장
for record in records_response['Records']:
data = json.loads(record['Data'].decode('utf-8'))
filename = f"iot_data_{datetime.utcnow().isoformat()}.json"
s3_client.put_object(Bucket=S3_BUCKET, Key=filename, Body=json.dumps(data))
print(f"Stored data in S3: {filename}")
6. 결론
이 글에서는 AWS Kinesis와 S3를 이용한 IoT 데이터 수집 방법을 설명하였습니다.
- AWS IoT Core에서 Kinesis로 데이터를 전송하는 Rule을 설정하였습니다.
- ESP32 디바이스에서 AWS IoT Core로 데이터를 전송하는 예제 코드를 작성하였습니다.
- AWS Kinesis에서 데이터를 가져와 AWS S3에 저장하는 Python 코드를 작성하였습니다.
이러한 방식을 활용하면 IoT 디바이스에서 수집된 데이터를 실시간으로 저장 및 분석할 수 있으며, 향후 AWS Athena, AWS Glue 등을 활용하여 추가 분석을 수행할 수도 있습니다.
'AWS IoT' 카테고리의 다른 글
AWS QuickSight, Grafana를 활용한 실시간 데이터 시각화 (0) | 2025.04.13 |
---|---|
AWS IoT 서버에서 Amazon SNS 및 AWS CloudWatch와 연동하여 알림 전송 (0) | 2025.04.11 |
AWS Lambda를 활용한 이벤트 기반 자동화 (이상 온도 감지 및 알림) (0) | 2025.04.10 |
AWS IoT Rules Engine을 이용한 데이터 필터링 및 처리 (0) | 2025.04.09 |
AWS IoT Device Shadow 활용: 온도 센싱 주기 및 시작/종료 관리 (0) | 2025.04.08 |