코딩하는 문과생

[Python] 파이썬 ORM 패키지, sqlalchemy 본문

프로그래밍/Python

[Python] 파이썬 ORM 패키지, sqlalchemy

코딩하는 문과생 2021. 4. 13. 23:56

[개요]

API로 호출한 데이터를 DB에 저장할 일이 생겼다.

약 16,000개 기업에 대해, API로 조회한 약 5년치 주가를 테이블에 저장하는 것인데,  데이터의 양은 대략 16,000 * 5년 * 약 200일(주가시장 거래일) = 약 1,600만 정도다.

(절대로 무시할 양이 아니다...)

 

[이전 코드]

처음엔 생각나는 대로 코드를 작성했는데, 결론적으로 처음 작성한 코드는 사용이 불가했다.

그 이유는 시간이 오래걸려서 인데, 한 회사의 주가를 가져와 저장하는 데 약 1분정도 소요되었다.

작성했던 코드를 간략히 설명하자면 

1. API를 호출해 DataFrame에 담는다.

2. 테이블의 최신날짜를 조회한다.

  2-1. 데이터가 없다면 from_date을 2016년 1월 1일로 설정

  2-2. 데이터가 있다면 현재날짜기준 20일 전으로 설정(매일 배치가 돌 예정이므로 20일전만 조회한다.)

3. 커서와 for을 이용해 날짜가 from_date 이후인 주가데이터를 생성된 테이블에 저장한다.

with self.conn.cursor() as curs:
	for r in df_stocks.itertuples():
		sql = f"REPLACE INTO ST_PRICE_DAY VALUES ('{market_cd}', '{stock_cd}', '{r.Index}', {r.Open}, {r.High}, {r.Low}, {r.Close}, {round(r.Change,4)}, {r.Volume})"
		curs.execute(sql)
	self.conn.commit()

 

여기서 문제는 DB연결 후 커서를 잡아서 replace into 쿼리를 돌리는 것인데, 커서를 잡아서 for을 이용해 쿼리를 돌리다보니 시간이 엄청 걸렸고, 대책이 필요했다. 

 

여러가지 방면으로 찾던 중, Pandas의 Dataframe 구조를 이용해 바로 테이블을 생성 or 데이터를 삽입하는 sqlalchemy 라는 패키지를 발견했고, 이를 이용해 전 데이터 저장하는 것을 약 1시간 정도로 단축시킬 수 있었다.

(물론 1시간도 짧은 시간은 아니지만, 이전 코드에 비하면 상당한 발전이라 생각한다...ㅎ)

 

[sqlalchemy]

- 파이썬에서 사용가능한 ORM이다.

...and ORM working styles ...

sqlalchemy docs에 들어가면 뭐라뭐라 설명이 많은데, 결국 sqlalchemy는 파이썬에서 지원해주는 객체와 관계를 연결해주는 ORM이다.

 

sqlalchemy는 저장된 Dataframe을 가지고 그대로 테이블을 생성 또는 테이블에 삽입하기 때문에 커서를 사용하는 방법보다 훨씬 빠르게 데이터의 저장이 가능하다. 

 

[변경된 코드]

작성한 코드 중 일부를 가져왔는데, sqlalchemy를 이용해 데이터를 훨씬 빠르게 저장할 수 있었다.

from sqlalchemy import create_engine

engine = create_engine(f'mysql+pymysql://{timing_db_info.user}:{timing_db_info.passwd}@{timing_db_info.db_url}/{timing_db_info.db_name}')

with engine.connect() as connection:
	connection.execute(f"DELETE FROM ST_PRICE_DAY WHERE stock_date >= '{from_date}'")
	df_prices.to_sql(name='ST_PRICE_DAY', con=connection, if_exists='append', index=False,
	dtype={	'market_cd': sqlalchemy.types.VARCHAR(100),
			'stock_cd': sqlalchemy.types.VARCHAR(100),
			'stock_date': sqlalchemy.types.VARCHAR(100),
			'open': sqlalchemy.types.DECIMAL(24, 6),
			'high': sqlalchemy.types.DECIMAL(24, 6),
			'low': sqlalchemy.types.DECIMAL(24, 6),
			'close': sqlalchemy.types.DECIMAL(24, 6),
			'diff': sqlalchemy.types.DECIMAL(24, 6),
			'volume': sqlalchemy.types.DECIMAL(24, 6),
			'last_update': sqlalchemy.DateTime()})

create_enigine을 이용해 생성된 engine은 connection pool을 유지한다.

connect() 또는 begin() 또는 execute()를 이용해 pool에 있는 Connection 객체를 얻을 수 있다. 

 

다양한 옵션을 부여해 테이블을 생성할 수 있는데, 일부 소개하자면

  • dtype: 생성되는 테이블의 type을 지정
  • name: 테이블 이름 지정
  • index: dataframe의 인덱스를 그대로 사용할 지 정할 수 있다.
  • if_exists: fail(default)은 ValueError를 발생시킨다는 의미를, replace는 기존 테이블을 모두 삭제하고 새롭게 테이블을 생성한다는 의미를, append는 기존 테이블에 이어서 dataframe을 insert를 한다는 의미를 가지고 있다. 다시말해 MySql에서 사용하는 REPLACE INTO 구문은 사용이 불가하고, 구현하고 싶다면 DELETE 를 이용해 INSERT하고자 하는 데이터를 삭제한 후 INSERT를 진행하면 된다.

등이 있다.