개요
- AWS Lambda 함수를 통해 매일 수집한 지하철 승하차 데이터와 날씨·공휴일 데이터를 통합 후, 예측 모델에 적합하도록 전처리를 진행함
- 전처리된 데이터를 LightGBM & XGBoost로 학습하고 AWS S3에 저장해놓고 매일 가져와서 예측값을 산출함
- 모든 과정은 AWS Lambda, S3, RDS를 통해 진행됨
1. 데이터 모델링을 위한 전처리
데이터
- 지하철 승하차수 : melt -> pivot화 , 날짜 형식 (%Y%M%D)로 변경, 역명&호선명은 텍스트로 Label Encoding 진행
- 날씨 : melt -> pivot화 , 날짜 형식 (%Y%M%D)로 변경
- 공휴일 : 날짜 형식 (%Y%M%D)로 변경, 공휴일 여부에 값이 있으면 1, 없으면 0 처리, 요일은 텍스트로 One-Hot Encoding 진행
1. 전처리
def preprocess(subway, weather, holiday):
subway.rename(columns={'사용일자': '날짜'}, inplace=True)
weather['구분'] = weather['구분'].str.replace(" ", "")
# 2. 데이터 병합 (기존 코드 그대로)
subway['날짜'] = pd.to_datetime(subway['날짜'], format='mixed')
weather['날짜'] = pd.to_datetime(weather['날짜'], format='mixed')
holiday['날짜'] = pd.to_datetime(holiday['날짜'], format='mixed')
# 시간 단위 → 일 단위 평균/최대값 집계
weather_daily = weather.pivot_table(
index='날짜',
columns='구분',
values='값',
aggfunc='mean' # or max depending on category
).reset_index()
# subway 기준으로 합치기
df = subway.merge(weather_daily, on='날짜', how='left')
df = df.merge(holiday, on='날짜', how='left')
df['날짜'] = df['날짜'].dt.strftime('%Y%m%d')
df['공휴일여부'] = df['공휴일여부'].map({'Y': 1, 'N': 0})
df['날짜'] = pd.to_datetime(df['날짜'])
# 날짜에서 필요한 숫자형 파생변수 생성 예시
df['년'] = df['날짜'].dt.year
df['월'] = df['날짜'].dt.month
df['일'] = df['날짜'].dt.day
return df
2. 인코딩
le_line = LabelEncoder()
df['호선_enc'] = le_line.fit_transform(df['호선'])
le_station = LabelEncoder()
df['역명_enc'] = le_station.fit_transform(df['역명'])
df = pd.get_dummies(df, columns=['요일'])
print(df)
3. Feature와 Target 정의
features = ['년','월','일',
'공휴일여부',
'기온',
'강수형태',
'강수',
'습도',
'풍속',
'호선_enc',
'역명_enc',
'요일_월', '요일_화', '요일_수', '요일_목', '요일_금', '요일_토', '요일_일'
]
Target =['승차','하차']
2. 데이터 모델링
- LightGBM & XGBoost를 사용
- 승차, 하차를 각각 target으로 잡고 각자 예측값 산출
- XGBoost의 옵션
- random_state=42,
- n_estimators=100,
- learning_rate=0.1,
- max_depth=6,
- n_jobs=-1
- LightGBM의 옵션
- random_state=42,
- n_estimators=100,
- learning_rate=0.1,
- max_depth=6,
- n_jobs=-1
def train_models(df, features):
"""
승차와 하차 각각에 대해 XGBoost와 LightGBM 모델 학습
"""
models = {}
for target in ['승차', '하차']:
print(f"\n=== {target} 예측 모델 학습 ===")
X = df[features]
y = df[target]
# 학습/검증 데이터 분리
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, shuffle=False)
models[target] = {}
# XGBoost 모델
print("XGBoost 학습 중...")
xgb_model = xgb.XGBRegressor(
random_state=42,
n_estimators=100,
learning_rate=0.1,
max_depth=6,
n_jobs=-1
)
xgb_model.fit(X_train, y_train)
xgb_pred = xgb_model.predict(X_val)
xgb_rmse = np.sqrt(mean_squared_error(y_val, xgb_pred))
print(f"XGBoost {target} 검증 RMSE: {xgb_rmse:.2f}")
models[target]['xgb'] = xgb_model
# LightGBM 모델
print("LightGBM 학습 중...")
lgb_model = lgb.LGBMRegressor(
random_state=42,
n_estimators=100,
learning_rate=0.1,
max_depth=6,
verbose=-1
)
lgb_model.fit(X_train, y_train)
lgb_pred = lgb_model.predict(X_val)
lgb_rmse = np.sqrt(mean_squared_error(y_val, lgb_pred))
print(f"LightGBM {target} 검증 RMSE: {lgb_rmse:.2f}")
models[target]['lgb'] = lgb_model
# 앙상블 성능
ensemble_pred = (xgb_pred + lgb_pred) / 2
ensemble_rmse = np.sqrt(mean_squared_error(y_val, ensemble_pred))
print(f"Ensemble {target} 검증 RMSE: {ensemble_rmse:.2f}")
return models
3. 학습데이터 S3에 저장
- 해당 학습을 매일 하는 건 무리라고 판단하고 한달에 한번만 진행
- 로컬로 수동으로 진행중
- lamdba 함수로 돌리기에는 용량이나 시간제한이 있어 불가능
- EC2나 Batch 이용법을 찾아봐야될듯
- S3에 학습 내용을 저장하고 매일 예측할 때마다 가져와서 사용
bucket = "subway-whitenut-bucket"
prefix = ""
# ✅ S3 저장 함수
def save_to_s3(models, le_line, le_station, features, bucket, prefix):
s3 = boto3.client('s3')
def upload_joblib(obj, filename):
buffer = BytesIO()
joblib.dump(obj, buffer)
buffer.seek(0)
s3.upload_fileobj(buffer, bucket, f"{prefix}{filename}.joblib")
upload_joblib(models, "model")
upload_joblib(le_line, "line_encoder")
upload_joblib(le_station, "station_encoder")
upload_joblib(features, "features")
save_to_s3(models, le_line, le_station, features, bucket, prefix)