[개인] 지하철 승하차수 예측 대시보드 - 예측값 산출
개요
- 두 모델을 돌려서 나온 학습 데이터를 가지고 예측값을 산출
- AWS Lambda 함수를 사용, 메모리 용량 및 시간제한으로 4개로 함수 나누어 실행
- 모두 트리거를 설정하여 하루 한번 규칙적인 시간에 실행되게 설정
1. 예측일 설정 및 예측값에 필요한 값 산출
- preprocess_lambda 이름의 함수로 매일 오후 9시반에 실행
- 데이터 수집한 테이블을 통해 예측에 필요한 값들만 추출
1. 날짜 형식 변경
def safe_to_datetime(series):
fmts = [
'%Y-%m-%d','%Y-%m-%d %H:%M:%S','%Y-%m-%d %H:%M',
'%Y%m%d','%m/%d/%Y','%d/%m/%Y'
]
def _c(x):
if pd.isna(x): return pd.NaT
s = str(x).strip()
if s.lower() in ('nat','none',''): return pd.NaT
if isinstance(x, (pd.Timestamp, datetime)):
ts = pd.to_datetime(x)
try: return ts.tz_localize(None)
except: return ts
for f in fmts:
try: return pd.to_datetime(s, format=f)
except: pass
try:
ts = pd.to_datetime(s, errors='coerce', infer_datetime_format=True)
try: return ts.tz_localize(None)
except: return ts
except:
return pd.NaT
return series.apply(_c)
1. 예측일 산출
- subway_stats에서 max(사용일자) → target_date = +1일 ```python q_max = text(“”” SELECT MAX( CASE WHEN “사용일자” ~ ‘^[0-9]{8}$’ THEN to_date(“사용일자”,’YYYYMMDD’) WHEN “사용일자” ~ ‘^[0-9]{4}-[0-9]{2}-[0-9]{2}’ THEN to_date(left(“사용일자”,10),’YYYY-MM-DD’) ELSE NULL END ) AS max_date FROM subway_stats “””) max_row = pd.read_sql(q_max, engine) max_date = max_row.loc[0, ‘max_date’] if pd.isna(max_date): return {“status”: “error”, “message”: “subway_stats에서 max(사용일자) 계산 실패”}
target_date = (pd.to_datetime(max_date) + pd.Timedelta(days=1)).normalize()
### 2. **예측일에 해당하는 값 추출**
- target_date의 weather/holiday만 로드
```python
weather = pd.read_sql(
text("SELECT 날짜, 구분, 값 FROM weather_stats WHERE 날짜::date = :td"),
engine, params={"td": target_date.date()}
)
holiday = pd.read_sql(
text("SELECT 날짜, 공휴일여부 FROM holidays_stats WHERE 날짜::date = :td"),
engine, params={"td": target_date.date()}
)
if weather.empty or holiday.empty:
return {"status": "no_data",
"message": "해당 날짜의 날씨/공휴일 데이터 없음",
"target_date": str(target_date.date())}
3. 가공 및 병합
- melt -> pivot 화
- 세 개의 데이터 통합(merge)
- 공휴일여부에서 null이면 0, null이 아니면 1 cjfl
- 날짜에서 년,월,일 추출
- 기상수치 null이면 0 처리
weather['날짜'] = safe_to_datetime(weather['날짜']).dt.normalize()
holiday['날짜'] = safe_to_datetime(holiday['날짜']).dt.normalize()
weather['구분'] = weather['구분'].astype(str).str.replace(" ", "", regex=False)
weather_daily = (
weather.pivot_table(index='날짜', columns='구분', values='값', aggfunc='mean').reset_index()
)
df = stations.copy()
df['날짜'] = target_date
df = df.merge(weather_daily, on='날짜', how='left').merge(holiday[['날짜','공휴일여부']], on='날짜', how='left')
# 공휴일여부 0/1
df['공휴일여부'] = df['공휴일여부'].fillna('N').map({'Y': 1, 'N': 0}).fillna(0).astype(int)
# 기상 수치 결측 0
for col in ['기온','강수형태','강수','습도','풍속']:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
# 날짜 파생
df['년'] = pd.to_datetime(df['날짜']).dt.year
df['월'] = pd.to_datetime(df['날짜']).dt.month
df['일'] = pd.to_datetime(df['날짜']).dt.day
4. 데이터 S3에 저장
- 예측을 원하는 데이터를 S3에 저장
- 예측값을 산출할 때 해당 데이터를 가져와서 실행 예정
S3_BUCKET = "subway-whitenut-bucket"
S3_KEY_PREFIX = "prepared_data"
key = f"{S3_KEY_PREFIX}/{target_date.strftime('%Y-%m-%d')}.csv"
buf = BytesIO()
# pandas가 BytesIO에 UTF-8로 씁니다.
df.to_csv(buf, index=False)
buf.seek(0)
s3.upload_fileobj(buf, S3_BUCKET, key, ExtraArgs={"ContentType": "text/csv; charset=utf-8"})
2. Xgboost 실행
- subwayPredictXGboost 라는 이름의 함수로 매일 오후 10시에 실행
- 전 단계에서 실행된 데이터를 가지고 예측값을 산출
- 전처리 진행 후 S3에 저장
1. csv 파일 불러오기, 저장
def _read_csv_from_s3(s3, key, encoding="utf-8"):
obj = s3.get_object(Bucket=S3_BUCKET, Key=key)
return pd.read_csv(BytesIO(obj["Body"].read()), encoding=encoding)
def _write_csv_to_s3(s3, df, key):
data = df.to_csv(index=False).encode("utf-8")
buf = BytesIO(data)
s3.upload_fileobj(buf, S3_BUCKET, key, ExtraArgs={"ContentType": "text/csv; charset=utf-8"})
2. 학습데이터 불러오기, 인코딩하기
def _load_joblib(s3, key):
buf = BytesIO()
s3.download_fileobj(S3_BUCKET, key, buf)
buf.seek(0)
return joblib.load(buf)
def _onehot_weekday(df):
df["날짜"] = pd.to_datetime(df["날짜"]).dt.normalize()
df["년"] = df["날짜"].dt.year
df["월"] = df["날짜"].dt.month
df["일"] = df["날짜"].dt.day
weekday_map = {0:'월',1:'화',2:'수',3:'목',4:'금',5:'토',6:'일'}
df["요일문자"] = df["날짜"].dt.dayofweek.map(weekday_map)
for d in ['월','화','수','목','금','토','일']:
df[f'요일_{d}'] = (df['요일문자'] == d).astype(int)
return df
def _safe_label_encode(le, series, name):
series = series.astype(str).str.strip()
known = set(le.classes_.tolist())
mask = series.isin(known)
if not mask.all():
print(f"[WARN] unseen {name} labels dropped: {series[~mask].unique().tolist()}")
enc = le.transform(series[mask])
return enc, mask
3. 예측에 사용될 데이터 가져오기
- 가장 최근에 있는 xgboost 파일 가져오기
def _find_latest_prepared_csv(s3): """prepared_data/ 아래 가장 최근 CSV 키 반환 (없으면 None).""" # list_objects_v2는 최대 1000개라, 폴더 규모 크면 paginator 권장 paginator = s3.get_paginator("list_objects_v2") latest = None for page in paginator.paginate(Bucket=S3_BUCKET, Prefix=f"{INPUT_PREFIX}/"): for obj in page.get("Contents", []): key = obj["Key"] if key.endswith(".csv"): if (latest is None) or (obj["LastModified"] > latest["LastModified"]): latest = obj return None if latest is None else latest["Key"]
4. 예측
- 위에 함수를 사용하여 원하는 데이터 셋을 형성
# 입력 데이터 로드
df = _read_csv_from_s3(s3, in_key)
if df.empty:
return {"status": "error", "message": "입력 CSV가 비어 있음", "input_key": in_key}
# 모델/인코더/피처 로드
models = _load_joblib(s3, MODEL_XGB_KEY) # dict 구조
features = _load_joblib(s3, FEATURES_KEY) # list
le_line = _load_joblib(s3, LINE_ENCODER_KEY)
le_station = _load_joblib(s3, STATION_ENCODER_KEY)
lgb_board = models['승차']['xgb'] # LightGBM(승차)
lgb_alight = models['하차']['xgb'] # LightGBM(하차)
# 예측
y_board = xgb_board.predict(X)
y_alight = xgb_alight.predict(X)
# 결과 생성
out_rows = []
for i, row in df.iterrows():
day_str = str(pd.to_datetime(row['날짜']).date())
out_rows.append({
"날짜": day_str, "호선": row["호선"], "역명": row["역명"],
"구분": "승차_xgb", "예측값": int(max(0, np.rint(y_board[i])))})
out_rows.append({
"날짜": day_str, "호선": row["호선"], "역명": row["역명"],
"구분": "하차_xgb", "예측값": int(max(0, np.rint(y_alight[i])))})
out_df = pd.DataFrame(out_rows)
5. S3 저장
- 예측값을 S3에 저장
date_token = pd.to_datetime(df['날짜'].iloc[0]).strftime("%Y-%m-%d") out_key = f"{OUTPUT_PREFIX}/{date_token}_xgb.csv" _write_csv_to_s3(s3, out_df, out_key)
3.LightBGM 실행
- subwayPredictLightBGM 라는 이름의 함수로 매일 오후 10시반에 실행
- 전 단계에서 실행된 데이터를 가지고 예측값을 산출
- 전처리 진행 후 S3에 저장
- 위의 XGboost와 과정 및 코드가 동일하며, 예측 부분만 다름
# 입력 데이터 로드
df = _read_csv_from_s3(s3, in_key)
if df.empty:
return {"status": "error", "message": "입력 CSV가 비어 있음", "input_key": in_key}
# 모델/인코더/피처 로드
models = _load_joblib(s3, MODEL_LGB_KEY) # dict 구조
features = _load_joblib(s3, FEATURES_KEY) # list
le_line = _load_joblib(s3, LINE_ENCODER_KEY)
le_station = _load_joblib(s3, STATION_ENCODER_KEY)
lgb_board = models['승차']['lgb'] # LightGBM(승차)
lgb_alight = models['하차']['lgb'] # LightGBM(하차)
# 예측
y_board = lgb_board.predict(X)
y_alight = lgb_alight.predict(X)
# 결과 생성
out_rows = []
for i, row in df.iterrows():
day_str = str(pd.to_datetime(row['날짜']).date())
out_rows.append({
"날짜": day_str, "호선": row["호선"], "역명": row["역명"],
"구분": "승차_lgb", "예측값": int(max(0, np.rint(y_board[i])))})
out_rows.append({
"날짜": day_str, "호선": row["호선"], "역명": row["역명"],
"구분": "하차_lgb", "예측값": int(max(0, np.rint(y_alight[i])))})
out_df = pd.DataFrame(out_rows)
4. 예측값 결과 전처리
- lightgbm_xgbosst라는 이름으로 매일 오후 11시에 실행
- 예측한 결과물을 시각화하기 쉽게 전처리 진행
1. s3에서 데이터 불러오기
- 가장 최신 날짜의 Xgboost, lightGBM의 데이터 가져오기
pat = re.compile(r"(?P<date>\d{4}-\d{2}-\d{2})_(?P<model>xgb|lgb)\.csv$")
def _list_prediction_keys(s3):
"""predictions/ 아래의 xgb/lgb csv 키 모두 나열"""
pg = s3.get_paginator("list_objects_v2")
keys = []
for page in pg.paginate(Bucket=S3_BUCKET, Prefix=PREDICTIONS_PREFIX):
for obj in page.get("Contents", []):
k = obj["Key"]
if k.endswith(".csv"):
m = pat.search(k)
if m:
keys.append(k)
return keys
def _find_latest_pair(keys):
"""xgb와 lgb가 모두 존재하는 가장 최근 날짜의 (xgb_key, lgb_key) 반환"""
by_date = {}
for k in keys:
m = pat.search(k)
if not m:
continue
d = m.group("date")
model = m.group("model")
by_date.setdefault(d, set()).add(model)
# 두 모델 다 있는 날짜만 후보
candidates = [d for d, models in by_date.items() if {"xgb", "lgb"} <= models]
if not candidates:
return None, None, None
latest = max(candidates) # YYYY-MM-DD 문자열은 사전순=시간순
# 실제 키 찾아서 리턴
xgb_key = f"{PREDICTIONS_PREFIX}{latest}_xgb.csv"
lgb_key = f"{PREDICTIONS_PREFIX}{latest}_lgb.csv"
return latest, xgb_key, lgb_key
2. 데이터 유니온
- 데이터 유니온 후 DB에 적재 ```python s3 = boto3.client(“s3”)
df_xgb = _read_csv(s3, xgb_key) df_lgb = _read_csv(s3, lgb_key)
검증 & 정리
required_cols = [“날짜”,”호선”,”역명”,”구분”,”예측값”] for name, df in [(“xgb”, df_xgb), (“lgb”, df_lgb)]: missing = [c for c in required_cols if c not in df.columns] if missing: return {“status”:”error”,”message”:f”{name} 컬럼 누락: {missing}”}
타입 보정
for df in (df_xgb, df_lgb): df[“날짜”] = pd.to_datetime(df[“날짜”]).dt.date df[“예측값”] = pd.to_numeric(df[“예측값”], errors=”coerce”).fillna(0).astype(int) df[“호선”] = df[“호선”].astype(str) df[“역명”] = df[“역명”].astype(str) df[“구분”] = df[“구분”].astype(str)
유니온
df_all = pd.concat([df_xgb, df_lgb], ignore_index=True) df_all = df_all.rename(columns={“구분”: “target_model”})
DB 저장
_write_db(df_all)
target_date = str(df_all[“날짜”].iloc[0]) return { “status”:”ok”, “date”: target_date, “rows”: int(len(df_all)), “xgb_key”: xgb_key, “lgb_key”: lgb_key, “table”: TABLE_NAME } ``` | 날짜 | 호선 | 역명 | 구분 | 예측값 | |————|——-|——————————|———–|——–:| | 2025-07-31 | 2호선 | 동대문역사문화공원(DDP) | 하차_lgb | 15726 | | 2025-07-31 | 4호선 | 동대문역사문화공원(DDP) | 하차_lgb | 14720 | | 2025-07-31 | 5호선 | 동대문역사문화공원(DDP) | 하차_lgb | 10149 | | 2025-07-31 | 2호선 | 동대문역사문화공원(DDP) | 승차_xgb | 15654 | | 2025-07-31 | 2호선 | 동대문역사문화공원(DDP) | 승차_lgb | 16997 | | 2025-07-31 | 4호선 | 동대문역사문화공원(DDP) | 승차_lgb | 15959 | | 2025-07-31 | 5호선 | 동대문역사문화공원(DDP) | 승차_lgb | 9813 | | 2025-07-31 | 5호선 | 동대문역사문화공원(DDP) | 승차_xgb | 10201 | | 2025-07-31 | 5호선 | 동대문역사문화공원(DDP) | 하차_xgb | 8885 | | 2025-07-31 | 4호선 | 동대문역사문화공원(DDP) | 하차_xgb | 14372 | | 2025-07-31 | 4호선 | 동대문역사문화공원(DDP) | 승차_xgb | 14861 | | 2025-07-31 | 2호선 | 동대문역사문화공원(DDP) | 하차_xgb | 18051 | | 2025-07-31 | 1호선 | 종로5가 | 하차_xgb | 27128 | | 2025-07-31 | 1호선 | 종로5가 | 승차_xgb | 26806 | | 2025-07-31 | 1호선 | 종로5가 | 하차_lgb | 25534 |