6 분 소요

개요

  • 두 모델을 돌려서 나온 학습 데이터를 가지고 예측값을 산출
  • AWS Lambda 함수를 사용, 메모리 용량 및 시간제한으로 4개로 함수 나누어 실행
  • 모두 트리거를 설정하여 하루 한번 규칙적인 시간에 실행되게 설정

1. 예측일 설정 및 예측값에 필요한 값 산출

  • preprocess_lambda 이름의 함수로 매일 오후 9시반에 실행
  • 데이터 수집한 테이블을 통해 예측에 필요한 값들만 추출

1. 날짜 형식 변경

def safe_to_datetime(series):
    fmts = [
        '%Y-%m-%d','%Y-%m-%d %H:%M:%S','%Y-%m-%d %H:%M',
        '%Y%m%d','%m/%d/%Y','%d/%m/%Y'
    ]
    def _c(x):
        if pd.isna(x): return pd.NaT
        s = str(x).strip()
        if s.lower() in ('nat','none',''): return pd.NaT
        if isinstance(x, (pd.Timestamp, datetime)):
            ts = pd.to_datetime(x)
            try: return ts.tz_localize(None)
            except: return ts
        for f in fmts:
            try: return pd.to_datetime(s, format=f)
            except: pass
        try:
            ts = pd.to_datetime(s, errors='coerce', infer_datetime_format=True)
            try: return ts.tz_localize(None)
            except: return ts
        except:
            return pd.NaT
    return series.apply(_c)

1. 예측일 산출

  • subway_stats에서 max(사용일자) → target_date = +1일 ```python q_max = text(“”” SELECT MAX( CASE WHEN “사용일자” ~ ‘^[0-9]{8}$’ THEN to_date(“사용일자”,’YYYYMMDD’) WHEN “사용일자” ~ ‘^[0-9]{4}-[0-9]{2}-[0-9]{2}’ THEN to_date(left(“사용일자”,10),’YYYY-MM-DD’) ELSE NULL END ) AS max_date FROM subway_stats “””) max_row = pd.read_sql(q_max, engine) max_date = max_row.loc[0, ‘max_date’] if pd.isna(max_date): return {“status”: “error”, “message”: “subway_stats에서 max(사용일자) 계산 실패”}

target_date = (pd.to_datetime(max_date) + pd.Timedelta(days=1)).normalize()


### 2. **예측일에 해당하는 값 추출**
- target_date의 weather/holiday만 로드
```python
weather = pd.read_sql(
    text("SELECT 날짜, 구분, 값 FROM weather_stats WHERE 날짜::date = :td"),
    engine, params={"td": target_date.date()}
)
holiday = pd.read_sql(
    text("SELECT 날짜, 공휴일여부 FROM holidays_stats WHERE 날짜::date = :td"),
    engine, params={"td": target_date.date()}
)
if weather.empty or holiday.empty:
    return {"status": "no_data",
            "message": "해당 날짜의 날씨/공휴일 데이터 없음",
            "target_date": str(target_date.date())}

3. 가공 및 병합

  • melt -> pivot 화
  • 세 개의 데이터 통합(merge)
  • 공휴일여부에서 null이면 0, null이 아니면 1 cjfl
  • 날짜에서 년,월,일 추출
  • 기상수치 null이면 0 처리
weather['날짜'] = safe_to_datetime(weather['날짜']).dt.normalize()
holiday['날짜'] = safe_to_datetime(holiday['날짜']).dt.normalize()
weather['구분'] = weather['구분'].astype(str).str.replace(" ", "", regex=False)

weather_daily = (
    weather.pivot_table(index='날짜', columns='구분', values='값', aggfunc='mean').reset_index()
)

df = stations.copy()
df['날짜'] = target_date
df = df.merge(weather_daily, on='날짜', how='left').merge(holiday[['날짜','공휴일여부']], on='날짜', how='left')

# 공휴일여부 0/1
df['공휴일여부'] = df['공휴일여부'].fillna('N').map({'Y': 1, 'N': 0}).fillna(0).astype(int)

# 기상 수치 결측 0
for col in ['기온','강수형태','강수','습도','풍속']:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)

# 날짜 파생
df['년'] = pd.to_datetime(df['날짜']).dt.year
df['월'] = pd.to_datetime(df['날짜']).dt.month
df['일'] = pd.to_datetime(df['날짜']).dt.day

4. 데이터 S3에 저장

  • 예측을 원하는 데이터를 S3에 저장
  • 예측값을 산출할 때 해당 데이터를 가져와서 실행 예정
S3_BUCKET = "subway-whitenut-bucket"
S3_KEY_PREFIX = "prepared_data" 

key = f"{S3_KEY_PREFIX}/{target_date.strftime('%Y-%m-%d')}.csv"
buf = BytesIO()
# pandas가 BytesIO에 UTF-8로 씁니다.
df.to_csv(buf, index=False)
buf.seek(0)
s3.upload_fileobj(buf, S3_BUCKET, key, ExtraArgs={"ContentType": "text/csv; charset=utf-8"})

2. Xgboost 실행

  • subwayPredictXGboost 라는 이름의 함수로 매일 오후 10시에 실행
  • 전 단계에서 실행된 데이터를 가지고 예측값을 산출
  • 전처리 진행 후 S3에 저장

1. csv 파일 불러오기, 저장

def _read_csv_from_s3(s3, key, encoding="utf-8"):
    obj = s3.get_object(Bucket=S3_BUCKET, Key=key)
    return pd.read_csv(BytesIO(obj["Body"].read()), encoding=encoding)

def _write_csv_to_s3(s3, df, key):
    data = df.to_csv(index=False).encode("utf-8")
    buf = BytesIO(data)
    s3.upload_fileobj(buf, S3_BUCKET, key, ExtraArgs={"ContentType": "text/csv; charset=utf-8"})

2. 학습데이터 불러오기, 인코딩하기

def _load_joblib(s3, key):
    buf = BytesIO()
    s3.download_fileobj(S3_BUCKET, key, buf)
    buf.seek(0)
    return joblib.load(buf)

def _onehot_weekday(df):
    df["날짜"] = pd.to_datetime(df["날짜"]).dt.normalize()
    df["년"] = df["날짜"].dt.year
    df["월"] = df["날짜"].dt.month
    df["일"] = df["날짜"].dt.day
    weekday_map = {0:'월',1:'화',2:'수',3:'목',4:'금',5:'토',6:'일'}
    df["요일문자"] = df["날짜"].dt.dayofweek.map(weekday_map)
    for d in ['월','화','수','목','금','토','일']:
        df[f'요일_{d}'] = (df['요일문자'] == d).astype(int)
    return df

def _safe_label_encode(le, series, name):
    series = series.astype(str).str.strip()
    known = set(le.classes_.tolist())
    mask = series.isin(known)
    if not mask.all():
        print(f"[WARN] unseen {name} labels dropped: {series[~mask].unique().tolist()}")
    enc = le.transform(series[mask])
    return enc, mask

3. 예측에 사용될 데이터 가져오기

  • 가장 최근에 있는 xgboost 파일 가져오기
    def _find_latest_prepared_csv(s3):
      """prepared_data/ 아래 가장 최근 CSV 키 반환 (없으면 None)."""
      # list_objects_v2는 최대 1000개라, 폴더 규모 크면 paginator 권장
      paginator = s3.get_paginator("list_objects_v2")
      latest = None
      for page in paginator.paginate(Bucket=S3_BUCKET, Prefix=f"{INPUT_PREFIX}/"):
          for obj in page.get("Contents", []):
              key = obj["Key"]
              if key.endswith(".csv"):
                  if (latest is None) or (obj["LastModified"] > latest["LastModified"]):
                      latest = obj
      return None if latest is None else latest["Key"]
    

4. 예측

  • 위에 함수를 사용하여 원하는 데이터 셋을 형성
# 입력 데이터 로드
df = _read_csv_from_s3(s3, in_key)
if df.empty:
    return {"status": "error", "message": "입력 CSV가 비어 있음", "input_key": in_key}

# 모델/인코더/피처 로드
models      = _load_joblib(s3, MODEL_XGB_KEY)            # dict 구조
features    = _load_joblib(s3, FEATURES_KEY)         # list
le_line     = _load_joblib(s3, LINE_ENCODER_KEY)
le_station  = _load_joblib(s3, STATION_ENCODER_KEY)

lgb_board   = models['승차']['xgb']                  # LightGBM(승차)
lgb_alight  = models['하차']['xgb']                  # LightGBM(하차)

# 예측
y_board  = xgb_board.predict(X)
y_alight = xgb_alight.predict(X)

# 결과 생성
out_rows = []
for i, row in df.iterrows():
    day_str = str(pd.to_datetime(row['날짜']).date())
    out_rows.append({
        "날짜": day_str, "호선": row["호선"], "역명": row["역명"],
        "구분": "승차_xgb", "예측값": int(max(0, np.rint(y_board[i])))})
    out_rows.append({
        "날짜": day_str, "호선": row["호선"], "역명": row["역명"],
        "구분": "하차_xgb", "예측값": int(max(0, np.rint(y_alight[i])))})
out_df = pd.DataFrame(out_rows)

5. S3 저장

  • 예측값을 S3에 저장
    date_token = pd.to_datetime(df['날짜'].iloc[0]).strftime("%Y-%m-%d")
    out_key = f"{OUTPUT_PREFIX}/{date_token}_xgb.csv"
    _write_csv_to_s3(s3, out_df, out_key)
    

    3.LightBGM 실행

  • subwayPredictLightBGM 라는 이름의 함수로 매일 오후 10시반에 실행
  • 전 단계에서 실행된 데이터를 가지고 예측값을 산출
  • 전처리 진행 후 S3에 저장
  • 위의 XGboost와 과정 및 코드가 동일하며, 예측 부분만 다름
# 입력 데이터 로드
df = _read_csv_from_s3(s3, in_key)
if df.empty:
    return {"status": "error", "message": "입력 CSV가 비어 있음", "input_key": in_key}

# 모델/인코더/피처 로드
models      = _load_joblib(s3, MODEL_LGB_KEY)            # dict 구조
features    = _load_joblib(s3, FEATURES_KEY)         # list
le_line     = _load_joblib(s3, LINE_ENCODER_KEY)
le_station  = _load_joblib(s3, STATION_ENCODER_KEY)

lgb_board   = models['승차']['lgb']                  # LightGBM(승차)
lgb_alight  = models['하차']['lgb']                  # LightGBM(하차)

# 예측
y_board  = lgb_board.predict(X)
y_alight = lgb_alight.predict(X)

# 결과 생성
out_rows = []
for i, row in df.iterrows():
    day_str = str(pd.to_datetime(row['날짜']).date())
    out_rows.append({
        "날짜": day_str, "호선": row["호선"], "역명": row["역명"],
        "구분": "승차_lgb", "예측값": int(max(0, np.rint(y_board[i])))})
    out_rows.append({
        "날짜": day_str, "호선": row["호선"], "역명": row["역명"],
        "구분": "하차_lgb", "예측값": int(max(0, np.rint(y_alight[i])))})
out_df = pd.DataFrame(out_rows)

4. 예측값 결과 전처리

  • lightgbm_xgbosst라는 이름으로 매일 오후 11시에 실행
  • 예측한 결과물을 시각화하기 쉽게 전처리 진행

1. s3에서 데이터 불러오기

  • 가장 최신 날짜의 Xgboost, lightGBM의 데이터 가져오기
pat = re.compile(r"(?P<date>\d{4}-\d{2}-\d{2})_(?P<model>xgb|lgb)\.csv$")

def _list_prediction_keys(s3):
    """predictions/ 아래의 xgb/lgb csv 키 모두 나열"""
    pg = s3.get_paginator("list_objects_v2")
    keys = []
    for page in pg.paginate(Bucket=S3_BUCKET, Prefix=PREDICTIONS_PREFIX):
        for obj in page.get("Contents", []):
            k = obj["Key"]
            if k.endswith(".csv"):
                m = pat.search(k)
                if m:
                    keys.append(k)
    return keys

def _find_latest_pair(keys):
    """xgb와 lgb가 모두 존재하는 가장 최근 날짜의 (xgb_key, lgb_key) 반환"""
    by_date = {}
    for k in keys:
        m = pat.search(k)
        if not m:
            continue
        d = m.group("date")
        model = m.group("model")
        by_date.setdefault(d, set()).add(model)

    # 두 모델 다 있는 날짜만 후보
    candidates = [d for d, models in by_date.items() if {"xgb", "lgb"} <= models]
    if not candidates:
        return None, None, None

    latest = max(candidates)  # YYYY-MM-DD 문자열은 사전순=시간순
    # 실제 키 찾아서 리턴
    xgb_key = f"{PREDICTIONS_PREFIX}{latest}_xgb.csv"
    lgb_key = f"{PREDICTIONS_PREFIX}{latest}_lgb.csv"
    return latest, xgb_key, lgb_key

2. 데이터 유니온

  • 데이터 유니온 후 DB에 적재 ```python s3 = boto3.client(“s3”)

df_xgb = _read_csv(s3, xgb_key) df_lgb = _read_csv(s3, lgb_key)

검증 & 정리

required_cols = [“날짜”,”호선”,”역명”,”구분”,”예측값”] for name, df in [(“xgb”, df_xgb), (“lgb”, df_lgb)]: missing = [c for c in required_cols if c not in df.columns] if missing: return {“status”:”error”,”message”:f”{name} 컬럼 누락: {missing}”}

타입 보정

for df in (df_xgb, df_lgb): df[“날짜”] = pd.to_datetime(df[“날짜”]).dt.date df[“예측값”] = pd.to_numeric(df[“예측값”], errors=”coerce”).fillna(0).astype(int) df[“호선”] = df[“호선”].astype(str) df[“역명”] = df[“역명”].astype(str) df[“구분”] = df[“구분”].astype(str)

유니온

df_all = pd.concat([df_xgb, df_lgb], ignore_index=True) df_all = df_all.rename(columns={“구분”: “target_model”})

DB 저장

_write_db(df_all)

target_date = str(df_all[“날짜”].iloc[0]) return { “status”:”ok”, “date”: target_date, “rows”: int(len(df_all)), “xgb_key”: xgb_key, “lgb_key”: lgb_key, “table”: TABLE_NAME } ``` | 날짜 | 호선 | 역명 | 구분 | 예측값 | |————|——-|——————————|———–|——–:| | 2025-07-31 | 2호선 | 동대문역사문화공원(DDP) | 하차_lgb | 15726 | | 2025-07-31 | 4호선 | 동대문역사문화공원(DDP) | 하차_lgb | 14720 | | 2025-07-31 | 5호선 | 동대문역사문화공원(DDP) | 하차_lgb | 10149 | | 2025-07-31 | 2호선 | 동대문역사문화공원(DDP) | 승차_xgb | 15654 | | 2025-07-31 | 2호선 | 동대문역사문화공원(DDP) | 승차_lgb | 16997 | | 2025-07-31 | 4호선 | 동대문역사문화공원(DDP) | 승차_lgb | 15959 | | 2025-07-31 | 5호선 | 동대문역사문화공원(DDP) | 승차_lgb | 9813 | | 2025-07-31 | 5호선 | 동대문역사문화공원(DDP) | 승차_xgb | 10201 | | 2025-07-31 | 5호선 | 동대문역사문화공원(DDP) | 하차_xgb | 8885 | | 2025-07-31 | 4호선 | 동대문역사문화공원(DDP) | 하차_xgb | 14372 | | 2025-07-31 | 4호선 | 동대문역사문화공원(DDP) | 승차_xgb | 14861 | | 2025-07-31 | 2호선 | 동대문역사문화공원(DDP) | 하차_xgb | 18051 | | 2025-07-31 | 1호선 | 종로5가 | 하차_xgb | 27128 | | 2025-07-31 | 1호선 | 종로5가 | 승차_xgb | 26806 | | 2025-07-31 | 1호선 | 종로5가 | 하차_lgb | 25534 |