AWS IoT

AWS Kinesis, AWS S3를 이용한 데이터 수집

임베디드 친구 2025. 4. 12. 15:00
728x90
반응형

AWS Kinesis, AWS S3를 이용한 데이터 수집

1. 서론

AWS IoT 시스템에서 수집된 데이터는 다양한 분석 및 시각화 목적으로 저장되고 처리될 수 있습니다. 이를 위해 AWS는 Amazon KinesisAmazon S3를 활용할 수 있도록 지원합니다. 이 글에서는 AWS IoT 디바이스에서 수집된 데이터를 AWS Kinesis를 통해 AWS S3로 저장하는 방법을 설명합니다.

본 예제에서는 ESP32를 IoT 디바이스로 사용하며, ESP-IDF와 AWS IoT Device SDK for Embedded C를 활용하여 데이터를 AWS IoT Core로 전송합니다. 서버 측에서는 Python을 이용하여 AWS Kinesis와 AWS S3를 설정하고 데이터를 수집하는 과정을 설명하겠습니다.


2. AWS Kinesis 및 S3 개요

2.1 AWS Kinesis

AWS Kinesis는 실시간 스트리밍 데이터 처리 서비스로, 대량의 데이터를 실시간으로 수집하고 저장할 수 있습니다. Kinesis Streams를 활용하면 IoT 디바이스에서 전송된 데이터를 실시간으로 분석하거나 AWS S3 등의 저장소로 보낼 수 있습니다.

2.2 AWS S3

Amazon S3(Simple Storage Service)는 확장 가능한 객체 스토리지로, 데이터를 안전하게 저장하고 분석할 수 있도록 지원합니다. Kinesis를 통해 수집된 IoT 데이터를 S3에 저장하면 장기 보관 및 추가 분석이 가능합니다.


3. AWS 환경 설정

AWS Kinesis와 S3를 이용하여 IoT 데이터를 수집하는 과정을 설정하겠습니다.

3.1 AWS Kinesis 스트림 생성

  1. AWS 콘솔에서 Kinesis 서비스로 이동합니다.
  2. Create data stream 버튼을 클릭합니다.
  3. 스트림 이름을 설정합니다. (예: IoTDataStream)
  4. 샤드(Shard) 개수를 선택합니다. (기본값: 1)
  5. 스트림을 생성한 후, ARN 값을 저장해 둡니다.

3.2 AWS S3 버킷 생성

  1. AWS 콘솔에서 S3 서비스로 이동합니다.
  2. Create bucket을 클릭합니다.
  3. 버킷 이름을 설정합니다. (예: iot-data-bucket)
  4. 퍼블릭 액세스를 비활성화하고, 버킷을 생성합니다.

3.3 AWS IoT Rule을 이용한 Kinesis 데이터 전송

AWS IoT Core에서 IoT 디바이스에서 수집된 데이터를 Kinesis로 전송하는 Rule을 설정해야 합니다.

  1. AWS IoT 콘솔에서 Act 메뉴로 이동합니다.
  2. Create a rule을 선택합니다.
  3. SQL 쿼리를 작성하여 특정 IoT 데이터를 필터링합니다.
    SELECT temperature, humidity, timestamp FROM 'iot/sensor/data'
  4. Kinesis를 선택하고, 앞서 생성한 IoTDataStream을 연결합니다.
  5. IAM Role을 생성하여 AWS IoT가 Kinesis에 데이터를 전송할 수 있도록 설정합니다.
  6. Rule을 저장합니다.

4. ESP32 IoT 디바이스 코드 (AWS IoT Device SDK for Embedded C)

ESP32에서 온도 데이터를 수집하고 MQTT를 통해 AWS IoT Core로 전송하는 코드를 작성합니다.

#include <stdio.h>
#include "aws_iot_mqtt_client_interface.h"
#include "esp_log.h"

#define AWS_IOT_MQTT_TOPIC "iot/sensor/data"
#define AWS_IOT_MQTT_QOS 0

void publish_temperature_data(AWS_IoT_Client *client) {
    char payload[100];
    float temperature = 25.5;  // 예제 온도 데이터
    snprintf(payload, sizeof(payload), "{\"temperature\":%.2f, \"timestamp\":%lu}", temperature, (unsigned long)time(NULL));

    IoT_Publish_Message_Params params;
    params.qos = AWS_IOT_MQTT_QOS;
    params.payload = (void *)payload;
    params.payloadLen = strlen(payload);

    aws_iot_mqtt_publish(client, AWS_IOT_MQTT_TOPIC, strlen(AWS_IOT_MQTT_TOPIC), &params);
    ESP_LOGI("AWS", "Published: %s", payload);
}

5. AWS Kinesis에서 데이터를 AWS S3로 전송 (Python 코드)

AWS Kinesis에서 데이터를 가져와 AWS S3에 저장하는 Python 코드를 작성합니다.

import boto3
import json
from datetime import datetime

# AWS 클라이언트 설정
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
s3_client = boto3.client('s3')

STREAM_NAME = 'IoTDataStream'
S3_BUCKET = 'iot-data-bucket'

# Kinesis에서 데이터 수신
response = kinesis_client.get_shard_iterator(StreamName=STREAM_NAME, ShardId='shardId-000000000000', ShardIteratorType='LATEST')
shard_iterator = response['ShardIterator']

records_response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=10)

# 데이터를 S3에 저장
for record in records_response['Records']:
    data = json.loads(record['Data'].decode('utf-8'))
    filename = f"iot_data_{datetime.utcnow().isoformat()}.json"
    s3_client.put_object(Bucket=S3_BUCKET, Key=filename, Body=json.dumps(data))
    print(f"Stored data in S3: {filename}")

6. 결론

이 글에서는 AWS Kinesis와 S3를 이용한 IoT 데이터 수집 방법을 설명하였습니다.

  1. AWS IoT Core에서 Kinesis로 데이터를 전송하는 Rule을 설정하였습니다.
  2. ESP32 디바이스에서 AWS IoT Core로 데이터를 전송하는 예제 코드를 작성하였습니다.
  3. AWS Kinesis에서 데이터를 가져와 AWS S3에 저장하는 Python 코드를 작성하였습니다.

이러한 방식을 활용하면 IoT 디바이스에서 수집된 데이터를 실시간으로 저장 및 분석할 수 있으며, 향후 AWS Athena, AWS Glue 등을 활용하여 추가 분석을 수행할 수도 있습니다.

반응형