업비트 API를 이용하여 암포화폐 데이터를 수집하고 SQLite 데이터베이스에 저장하는 파이썬 스크립트에 대한 글을 주말을 이용해 남겨보려 합니다. 일전에 자동 거래를 공부하면서, 머신러닝으로 예측할 수 있는 모델을 만들어 보고 싶었습니다. 그래서 그 첫 단계로 데이터를 수집할 방법을 찾으면서 작성했던 코드로 정리를 하려 합니다. 머신러닝 모델에 대한 공부를 다시 시작 하며서, 오래전에 작성했던 코드에 좀 더 익숙해 지려는 의도도 있습니다. 🙂
import requests import sqlite3 import json import time from datetime import datetime
이 부분은 스크립트 실행에 필요한 모듈들을 임포트합니다. ‘request’는 API호출을, ‘sqlite3’는 데이터베이스 관리를, ‘json’은 데이터 포맷 변환을, ‘time’과 ‘datetime’은 시간 관련 기능을 담당합니다.
upbit_endpoint = "https://api.upbit.com/v1/candles/minutes/60" conn = sqlite3.connect('upbit_data.db') cursor = conn.cursor()
업비트 API의 주소를 설정하고, SQLite 데이터베이스에 연결한 후 커서 객체를 생성합니다.
coins = ['ADA'] markets_upbit = [f'KRW-{coin}' for coin in coins]
수집할 코인을 지정합니다. 여기서는 에이다(ADA)를 예로 들었습니다. 다른 코인들에 대한 데이터를 동시에 받고 싶으면, coins 변수에 거래소에서 사용되는 coin 명을 추가해 주면 됩니다. (예_ ‘BTC’)
# set data count count = 1000
최근의 1000개 60분봉 데이터를 받아서 학습 모델을 생성해보려고 우선 count를 1000으로 선언해 주었다.
필요에 따라 수량을 변경해가면서 다운 받을 수 있다.
for i in range(len(coins)): # API 호출 및 데이터 처리 코드
각 코인에 대해 반복문을 돌면서 데이터를 수집하고 처리합니다. API에서 제공하는 데이터는 JSON 형식으로 반환되며, 이를 파싱하여 데이터베이스에 저장합니다. ‘coints’에는 수집하고자 하는 암호화폐의 심볼(예:’ADA’)이 저장되어 있습니다. ‘range(len(coins))’는 0부터 ‘coins’리스트의 길이까지의 숫자를 생성하여, 각 코인에 순차적으로 접근할 수 있게 합니다.
table_name_upbit = market_upbit.replace('-', '_') cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name_upbit} (timestamp INTEGER, open REAL, high REAL, low REAL, close REAL, volume REAL)")
데이터베이스에 코인별 테이블을 생성합니다. 테이블 이름은 코인의 시장 식별자(‘market_upbit’, 예:’KRW-ADA’)에서 하이픈(‘-‘)을 밑줄(‘_’)로 바꾸어 사용합니다. ‘CREAT TABLE IF NOT EXISTS’ SQL 구문은 해당 이름의 테이블이 이미 존재하지 않는 경우에만 새로운 테이블을 생성하도록 합니다.
querystring_upbit = {"market": market_upbit, "count": count} headers_upbit = {"Accept": "application/json"}
업비트 API에 요청을 보내기 위한 준비를 합니다. ‘market’은 어떤 시장(코인)의 데이터를 요청할지, ‘count’는 몇 개의 데이터를 가져올지를 지정합니다. ‘headers_upbit’는 API가 JSON 형식의 응답을 반환하도록 지정합니다.
while len(data_upbit) < count: response_upbit = requests.get(upbit_endpoint, params=querystring_upbit, headers=headers_upbit)
API 에서 충분한 양의 데이터를 수집할 때까지 요청을 반복합니다. 만약 요청한 데이터 개수(‘count’)를 충족하지 못했다면, 마지막 캔들의 시간(‘last_timestamp’)을 사용하여 추가 데이터를 요청합니다.
for item in data_upbit: timestamp = datetime.strptime(item['candle_date_time_kst'], "%Y-%m-%dT%H:%M:%S") cursor.execute(f"INSERT INTO {table_name_upbit} VALUES (?, ?, ?, ?, ?, ?)", (timestamp, item['opening_price'], item['high_price'], item['low_price'], item['trade_price'], item['candle_acc_trade_volume']))
수집된 데이터를 데이터베이스 테이블에 저장합니다. 각 데이터 항목은 ‘item’딕셔너리에서 추출하여 적절한 형식으로 변환된 수, ‘INSERT’ SQL 구문을 사용하여 테이블에 추가됩니다.
for i in range(len(coins)): market_upbit = markets_upbit[i] table_name_upbit = market_upbit.replace('-', '_') cursor.execute(f"SELECT * FROM {table_name_upbit} ORDER BY timestamp ASC") sorted_data = cursor.fetchall()
데이터베이스에서 데이터를 선택하고, 시간 순서대로 정렬하는 작업을 추가하였습니다.
cursor.execute(f"DELETE FROM {table_name_upbit}")
정렬된 데이터를 다시 삽ㅇ입하기 전에, 해당 테이블의 기존 데이터를 모두 삭제합니다. 이는 중복을 방지하고, 데이터의 일관성을 유지하는 데 중요합니다.
for item in sorted_data: cursor.execute(f"INSERT INTO {table_name_upbit} VALUES (?, ?, ?, ?, ?, ?)", item)
각 데이터 항목(‘item’)을 데이터베이스 테이블에 다시 삽입합니다. 여기서 사용된 ‘INSERT’ SQL 구문은 데이터의 삽입을 처리하며, ‘item’은 ‘sorted_data’에서 추출된 각 행을 나타냅니다. 이는 데이터가 시간 순서대로 데이터베이스에 저장되도록 보장합니다.
파이썬 및 머신러닝과 관련한 좋은 인사이트가 좀 더 쌓이면, 도움이 될 수 있는 글들을 다시 남기도록 하겠습니다. 🙂
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Jun 19 18:09:55 2023 """ import requests import sqlite3 import json import time from datetime import datetime # API end point upbit_endpoint = "https://api.upbit.com/v1/candles/minutes/60" # database connection conn = sqlite3.connect('upbit_data.db') cursor = conn.cursor() # coin list coins = ['ADA'] markets_upbit = [f'KRW-{coin}' for coin in coins] # set data count count = 1000 for i in range(len(coins)): coin = coins[i] market_upbit = markets_upbit[i] table_name = coin.lower() # Table creation table_name_upbit = market_upbit.replace('-', '_') cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name_upbit} (timestamp INTEGER, open REAL, high REAL, low REAL, close REAL, volume REAL)") # Upbit API call querystring_upbit = {"market": market_upbit, "count": count} headers_upbit = {"Accept": "application/json"} data_upbit = [] while len(data_upbit) < count: response_upbit = requests.get(upbit_endpoint, params=querystring_upbit, headers=headers_upbit) try: response_upbit.raise_for_status() data_upbit += json.loads(response_upbit.text) if len(data_upbit) < count: last_timestamp = data_upbit[-1]['candle_date_time_utc'] querystring_upbit['to'] = last_timestamp # 5sec sleep time.sleep(5) except requests.exceptions.RequestException as e: print(f"Error occurred while fetching data for {market_upbit}: {e}") break # data save (Upbit) for item in data_upbit: timestamp = item['candle_date_time_kst'] # time stamp for korea time(KST) timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S") open_price = item['opening_price'] high_price = item['high_price'] low_price = item['low_price'] close_price = item['trade_price'] volume = item['candle_acc_trade_volume'] # save data(Upbit) cursor.execute(f"INSERT INTO {table_name_upbit} VALUES (?, ?, ?, ?, ?, ?)", (timestamp, open_price, high_price, low_price, close_price, volume)) #for market_name in markets: for i in range(len(coins)): market_upbit = markets_upbit[i] table_name_upbit = market_upbit.replace('-', '_') cursor.execute(f"SELECT * FROM {table_name_upbit} ORDER BY timestamp ASC") sorted_data = cursor.fetchall() # delete current data cursor.execute(f"DELETE FROM {table_name_upbit}") # insert arranged data for item in sorted_data: cursor.execute(f"INSERT INTO {table_name_upbit} VALUES (?, ?, ?, ?, ?, ?)", item) # commit and connection close conn.commit() conn.close()
목적 - 장수풍뎅이 애벌레의 움직임을 관찰하기 위해서 장수 풍뎅이 애벌래가 한 식구가 되었지만, 도무지 이…