프로그램 개발
[가상화폐] 거래소 코인 시세차익 알림봇(코인봇)
Pasted
2023. 3. 27. 18:57
해당 프로그램은 웹소켓 (업비트, 바이낸스) API를 이용하여 코인 정보를 받은 다음, 코인 정보를 시세차익 기준으로 정렬하여 텔레그램 봇을 통해 사용자에게 코인의 시세 정보(차익 정보)를 제공하는 알림 프로그램이다.
- Source Code
main.py
import asyncio # 거래소 5종 모두 이용할 경우 -> , bybit, okx, bitget 추가 from api import upbit, binance import util import traceback import time import platform import requests import json import os from consts import * class Premium: def __init__(self): self.exchange_price = {} # 거래소별 가격 데이터를 저장할 딕셔너리 self.exchange_accum_trade_price = {} # 거래소별 거래대금 데이터를 저장할 딕셔너리 # 해외 거래소의 USD/USDT 가격을 저장하는 함수 # while 문을 통해 일정 주기를 기준으로 무한히 반복 async def get_usd_price(self): while True: try: data = requests.get('https://quotation-api-cdn.dunamu.com/v1/forex/recent?codes=FRX.KRWUSD').json() self.exchange_price['USD'] = {'base':data[0]['basePrice']} # BUSD/USDT -> USDT/BUSD -> (USDT) 역수를 취함 res = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BUSDUSDT') self.exchange_price['USDT'] = {'base': 1 / float(res.json()['price'])} await asyncio.sleep(DOLLAR_UPDATE) # 달러 가격 업데이트 주기 (1시간) except Exception as e: util.send_to_telegram(traceback.format_exc()) # 거래소별 코인의 누적 거래대금을 조회하여 self.exchange_accum_trade_price 에 저장하는 함수 # while 문을 통해 일정 주기를 기준으로 무한히 반복 async def check_exchange_accum_trade_price(self): while True: try: # 거래소별 connect_socket 을 통해 가져와야 할 코인 정보가 있어서 대기 await asyncio.sleep(ACCUM_TRADE_PRICE_DELAY) # 1. Upbit 누적 거래대금 확인 upbit.get_exchange_accum_trade_price(self.exchange_accum_trade_price) # 2. Binance 누적 거래대금 확인 binance.get_exchange_accum_trade_price(self.exchange_accum_trade_price, self.exchange_price) # # 3. Bybit 누적 거래대금 확인 # bybit.get_exchange_accum_trade_price(self.exchange_accum_trade_price, self.exchange_price) # # # 4. OKX 누적 거래대금 확인 # okx.get_exchange_accum_trade_price(self.exchange_accum_trade_price, self.exchange_price) # # # 5. Bitget 누적 거래대금 확인 # bitget.get_exchange_accum_trade_price(self.exchange_accum_trade_price, self.exchange_price) await asyncio.sleep(ACCUM_TRADE_PRICE_UPDATE) except Exception as e: util.send_to_telegram(traceback.format_exc()) # self.exchange_price 에 저장된 거래소별 코인 정보를 비교하고 특정 수치(%)이상 격차 발생시 알림을 전달하는 함수 async def compare_price(self): while True: try: await asyncio.sleep(COMPARE_PRICE_DELAY) # 거래소별 connect_socket 을 통해 가져와야 할 코인 정보가 있어서 대기 util.send_to_telegram('✅') base_message = '🔥 프리미엄 정보 🔥\n\n' exchange_price = self.exchange_price.copy() # 거래소에서 얻어온 가격 데이터 복사 message_dict = {} # 격차 발생시 알람을 보낼 메시지를 저장해둘 딕셔너리 message_list = [''] # message_dict 에 저장했던 메시지들을 보낼 순서대로 저장한 리스트 # 버블정렬을 이용하여 데이터를 오름차 순으로 정렬하고 코인 정보를 보여주는 과정 for key in exchange_price: if key in ['USD', 'USDT']: continue # 거래소 목록 리스트를 생성 exchange_list = list(exchange_price[key]) # 'exchange_list[i]' = 기준 거래소명, 'base_exchange' = 가격 for i in range(0, len(exchange_list) - 1): base_exchange = exchange_list[i] # 가격 데이터가 들어있는 'exchange_price'에 코인이 키 값으로 있고, 거래소 명이 'base_exchange' 에 키 값으로 있는 형태 base_exchange_price = round(float(exchange_price[key][base_exchange]), 2) if float( exchange_price[key][base_exchange]) > 0 else float(exchange_price[key][base_exchange]) if not base_exchange_price: continue for j in range(i + 1, len(exchange_list)): # 비교 거래소의 가격, 코인의 가격을 저장 # 데이터를 꺼내오는 키 값이 'base_exchange' -> 'compare_exchange' 로 바뀌고, 변수명이 변경 compare_exchange = exchange_list[j] compare_exchange_price = round(float(exchange_price[key][compare_exchange]), 2) if float( exchange_price[key][compare_exchange]) > 0 else float(exchange_price[key][compare_exchange]) # 가격 데이터가 존재하지 않다면 비교할 수 없으므로 넘어감 if not compare_exchange_price: continue # 거래소간의 가격 차이(%) diff = round((base_exchange_price - compare_exchange_price) / base_exchange_price * 100, 3) if base_exchange_price else 0 if abs(diff) > NOTI_GAP_STANDARD: message = f'{key}, {base_exchange}_{compare_exchange} 프리미엄({diff}%)\n' message += f'현재가격:{base_exchange_price:,.2f}원/{compare_exchange_price:,.2f}원\n' if self.exchange_accum_trade_price[key][base_exchange] and \ self.exchange_accum_trade_price[key][ compare_exchange]: # 거래대금 데이터가 있는 경우에만 알림 추가 message += f'거래대금:{self.exchange_accum_trade_price[key][base_exchange]:,.2f}억원/{self.exchange_accum_trade_price[key][compare_exchange]:,.2f}억원\n' message_dict[diff] = message # 알림을 보내기 전에 메시지를 정렬 message_dict = dict(sorted(message_dict.items(), reverse=True)) # 메시지 격차 발생 순으로 정렬 for i in message_dict: if len(message_list[len(message_list) - 1]) + len(message_dict[i]) < TELEGRAM_MESSAGE_MAX_SIZE: message_list[len(message_list) - 1] += message_dict[i] + '\n' else: message_list.append(message_dict[i] + '\n') message_list[0] = base_message + message_list[0] # 알림 첫줄 구분용 문구 추가 # 정렬한 메시지를 순서대로 텔레그램 알림 전송 for message in message_list: util.send_to_telegram(message) except Exception as e: util.send_to_telegram(traceback.format_exc()) async def run(self): await asyncio.wait([ # 태스크들을 비동기 실행하고 결과를 기다림 asyncio.create_task(self.get_usd_price()), # USD(달러) 데이터 조회/저장 asyncio.create_task(upbit.connect_websocket(self.exchange_price)), # Upbit websocket 연결 및 가격정보 조회/저장 asyncio.create_task(binance.connect_websocket(self.exchange_price)), # Binance websocket 연결 및 가격정보 조회/저장 # asyncio.create_task(bybit.connect_websocket(self.exchange_price)), # Bybit websocket 연결 및 가격정보 조회/저장 # asyncio.create_task(okx.connect_websocket(self.exchange_price)), # OKX websocket 연결 및 가격정보 조회/저장 # asyncio.create_task(bitget.connect_websocket(self.exchange_price)), # Bitget websocket 연결 및 가격정보 조회/저장 asyncio.create_task(self.check_exchange_accum_trade_price()), # 누적 거래대금 조회/저장 asyncio.create_task(self.compare_price()) # 가격비교 알림 ]) if __name__ == '__main__': premium = Premium() asyncio.run(premium.run()) # 비동기 함수 호출
consts.py
import os # USD (달러 환율) USD_PRICE = 1308.75 # 2023-03.19. # Exchanges (거래소 이름) UPBIT = 'Upbit' BINANCE = 'Binance' BYBIT = 'Bybit' OKX = 'OKX' BITGET = 'Bitget' # All Exchanges (거래소 리스트) EXCHANGE_LIST = [UPBIT, BINANCE] # 업비트와 바이낸스만 불러올 경우 # EXCHANGE_LIST = [UPBIT, BINANCE, BYBIT, OKX, BITGET] # 5종 거래소 모두 불러올 경우 # API KEYS (API KEY 정보) UPBIT_API_KEY = os.getenv('UPBIT_API_KEY') UPBIT_SECRET_KEY = os.getenv('UPBIT_SECRET_KEY') BINANCE_API_KEY = os.getenv('BINANCE_API_KEY') BINANCE_SECRET_KEY = os.getenv('BINANCE_SECRET_KEY') BYBIT_API_KEY = os.getenv('BYBIT_API_KEY') BYBIT_SECRET_KEY = os.getenv('BYBIT_SECRET_KEY') OKX_API_KEY = os.getenv('OKX_API_KEY') OKX_SECRET_KEY = os.getenv('OKX_SECRET_KEY') OKX_PASS = os.getenv('OKX_PASS') BITGET_API_KEY = os.getenv('BITGET_API_KEY') BITGET_SECRET_KEY = os.getenv('BITGET_SECRET_KEY') BITGET_PASS = os.getenv('BITGET_PASS') # TELEGRAM (텔레그램 정보) TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID') TELEGRAM_MESSAGE_MAX_SIZE = 4095 # 텔레그램 메시지 최대길이 # SOCKET (웹소켓) SOCKET_PING_INTERVAL = 20 # 20초 SOCKET_RETRY_TIME = 30 # 30초 SOCKET_PING_TIMEOUT = 30 # 30초 # DELAY (딜레이, 주기) DOLLAR_UPDATE = 60 * 60 # 달러 가격 업데이트 주기 (1시간) ACCUM_TRADE_PRICE_DELAY = 3 * 60 # 누적 거래 대금조회 최초 실행대기 (3분) ACCUM_TRADE_PRICE_UPDATE = 60 * 60 # 누적 거래 대금조회 업데이트 주기 (1시간) COMPARE_PRICE_DELAY = 5 * 60 # 가격 비교 최초 실행대기 (5분) TIME_DIFF_CHECK_DELAY = 30 * 60 # 바이낸스 서버와 시간비교 주기 (30분) # ETC (누적 거래 대금) MILLION = 100000000 # 억 (단위) NOTI_GAP_STANDARD = 1.0 # 거래소간 차이가 발생할 때 알림을 보낼 기준(%)
util.pyfrom consts import * import telegram import time from datetime import datetime, timedelta bot = None # 텔레그램 봇을 통해 메시지를 전송하는 함수 def send_to_telegram(message): global bot if not bot: if TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID: bot = telegram.Bot(token=TELEGRAM_BOT_TOKEN) else: raise Exception('TELEGRAM_PATH Undefined.') retries = 0 max_retries = 3 while retries < max_retries and bot: try: print(message) bot.send_message(text=message[:TELEGRAM_MESSAGE_MAX_SIZE], chat_id=TELEGRAM_CHAT_ID) return True except telegram.error.TimedOut as timeout: time.sleep(5 * retries) retries += 1 print('Telegram got a error! retry...') except Exception as e: bot = None retries = max_retries if retries == max_retries: bot = telegram.Bot(token=TELEGRAM_BOT_TOKEN) print('Telegram failed to retry...') # 소켓 연결이 끊어진 경우, 이전까지 받아온 데이터들은 더이상 유효하지 않기 때문에 데이터를 삭제하는 역할을 하는 함수 (데이터 중복 삭제) # 'exchange_price' 내의 데이터를 삭제 def clear_exchange_price(exchange, exchange_price): for ticker in exchange_price: if exchange in exchange_price[ticker]: del exchange_price[ticker][exchange] # 매일 오전 9시(지정된 시간)인지 확인하여 시간이 넘었다면 소켓 초기화 (websocket 재연결 목적) def is_need_reset_socket(start_time): now = datetime.now() start_date_base_time = start_time.replace(hour=9, minute=0, second=0, microsecond=0) next_base_time = (start_time + timedelta(days=1)).replace(hour=9, minute=0, second=0, microsecond=0) if start_time < start_date_base_time: if start_date_base_time < now: return True else: return if next_base_time < now: return True else: return
upbit.py
import traceback import pyupbit import requests from consts import * import util import asyncio import websockets import json import socket from datetime import datetime import uuid import pandas as pd # Docs : https://pyupbit.readthedocs.io/en/latest/ exchange = UPBIT # 원화(KRW) 마켓과 BTC 마켓간의 중복된 정보를 삭제하고, 하나의 리스트로 통합하여 리턴하는 함수 def get_all_ticker(): # 원화 마켓의 코인 정보를 로드 krw_ticker = pyupbit.get_tickers(fiat='KRW') # BTC 마켓의 코인 정보를 로드 btc_ticker = pyupbit.get_tickers(fiat='BTC') # BTC 마켓에 있는 코인목록 중에 원화 마켓에 있는 정보를 삭제 only_in_btc = [ticker for ticker in btc_ticker if 'KRW-' + ticker.split('-')[1] not in krw_ticker] # 중복없는 코인목록을 만들기 위해 원화 마켓에 있는 코인목록과 'only_in_btc' 코인목록을 합쳐서 반환 return krw_ticker + only_in_btc # 거래대금 정보를 리턴하는 함수 def get_exchange_accum_trade_price(exchange_accum_trade_price): print(exchange + " get_exchange_accum_trade_price") # BTC 가격 불러오기. btc_price = pyupbit.get_current_price('KRW-BTC') # 전체 누적 거래대금 조회. res = requests.get('https://api.upbit.com/v1/ticker', {'markets': get_all_ticker()}).json() for i in res: ticker = i['market'].split('-')[1] # 코인 티커 (KRW-BTC 중 BTC) currency = i['market'].split('-')[0] # 코인 티커 (KRW-BTC 중 KRW) # ticker(코인 정보)가 아직 Dictionary 에 저장되어 있지 않다고 하면 초기화 if ticker not in exchange_accum_trade_price: # ex) exchange_accum_trade_price[symbol] = {'Upbit': None, 'Binance': None, 'Bybit': None, 'OKX': None, 'Bitget': None} exchange_accum_trade_price[ticker] = dict.fromkeys(EXCHANGE_LIST, None) # currency(기준 통화)가 만약에 'BTC'이면 값을 저장할 때 if currency == 'BTC': # 누적 거래대금에 BTC 가격을 곱하고, 억 단위로 표현하기 위해서 나누기 억을하고 반올림 exchange_accum_trade_price[ticker][exchange] = round(i['acc_trade_price_24h'] * btc_price / MILLION, 2) # currency(기준 통화)가 'BTC'가 아니면 / KRW(원화)일 때 else: # 누적 거래대금에 BTC 가격을 곱하지 않음 exchange_accum_trade_price[ticker][exchange] = round(i['acc_trade_price_24h'] / MILLION, 2) # 웹소켓 연결 이후, 수신한 가격 데이터를 'exchange_price'에 저장하는 함수 async def connect_websocket(exchange_price): while True: try: util.send_to_telegram('[{}] Creating new connection...'.format(exchange)) util.clear_exchange_price(exchange, exchange_price) start_time = datetime.now() # Connect(연결) async with websockets.connect('wss://api.upbit.com/websocket/v1', ping_interval=None, ping_timeout=None) as websocket: # 수신하고 싶은 시세 정보를 나열하는 필드(포맷) subscribe_fmt = [ {'ticket': str(uuid.uuid4())[:6]}, { 'type': 'ticker', 'codes': get_all_ticker(), 'isOnlyRealtime': True }, ] # 데이터를 json 포맷으로 변환 subscribe_data = json.dumps(subscribe_fmt) # Send(송신)를 통해 웹소켓 서버에서 어떤 데이터를 요청하는지 알 수 있음 await websocket.send(subscribe_data) # Receive(수신)를 통해 데이터를 받아올 수 있음 while True: try: data = await websocket.recv() data = json.loads(data) if 'code' not in data: # 응답 데이터(딕셔너리)에 code가 없는 경우 제외 print('[Data error]', data) continue currency = data['code'].split('-')[0] # KRW-BTC -> KRW(기준 통화) ticker = data['code'].split('-')[1] # KRW-BTC -> BTC(시세조회 대상 코인) # print(ticker, data) # 결과 출력 테스트 (※ 프로그램을 실행할 때에는 주석처리 필수) # print(ticker, exchange_price) # 딕셔너리 출력 테스트 (※ 프로그램을 실행할 때에는 주석처리 필수) if ticker not in exchange_price: # 'exchange_price' 에 코인이 존재하지 않다면 거래소 코인 초기화 exchange_price[ticker] = {exchange: None} if currency == 'BTC': # 기준 통화가 BTC인 경우는 "현재 가격 * BTC 가격" = 원화 환산 가격 if 'BTC' in exchange_price: # 변수 데이터중에 'trade_price' 에 현재 가격이 있으므로 아래와 같이 저장 # 만약에 'exchange_price' 에 비트코인 가격이 있다면 비트코인 가격과 같이 곱해주고, # 비트코인 가격이 없다고 하면, 계산을 할 수 없으므로 0을 곱해 가격을 0으로 만듬 exchange_price[ticker][exchange] = float(data['trade_price']) * \ float(exchange_price['BTC'][exchange]) if exchange_price['BTC'] and exchange in exchange_price['BTC'] else 0 else: # 기준 통화가 원화인 경우, 현재 가격(trade_price) 그대로 저장 exchange_price[ticker][exchange] = float(data['trade_price']) if util.is_need_reset_socket(start_time): # 매일 아침 9시(지정된 시간)에 소켓 재연결 util.send_to_telegram('[{}] Time to new connection...'.format(exchange)) break # 만약에 에러가 발생하면 에러를 출력하고, 'SOCKET_RETRY_TIME' 만큼 대기를 했다가 break 후, 처음부터 다시 진행 except Exception as e: print(traceback.format_exc()) await asyncio.sleep(SOCKET_RETRY_TIME) break await websocket.close() except Exception as e: print(traceback.format_exc()) await asyncio.sleep(SOCKET_RETRY_TIME) if __name__ == '__main__': exchange_accum_trade_price = {} get_exchange_accum_trade_price(exchange_accum_trade_price) print(exchange_accum_trade_price)
binance.py
import traceback import requests from consts import * import util import asyncio import websockets import json import socket from datetime import datetime # Docs : https://binance-docs.github.io/apidocs/spot/en/ exchange = BINANCE def get_all_ticker(): # requests.get 을 이용해 요청. res = requests.get('https://api.binance.com/api/v3/exchangeInfo') # 응답이 오면 응답을 json 형태로 변환 res = res.json() # @miniTicker 를 붙이는 이유 # : 웹소켓에서 데이터를 요청할 때, 페어명을 이처럼 붙여서 보내달라고 하기 때문 return [s['symbol'].lower() + '@miniTicker' for s in res['symbols'] if 'USDT' in s['symbol']] # 거래대금 정보를 리턴하는 함수 # 해외 거래소는 파라미터를 하나 더 받음 -> exchange_price : 거래소 별로 코인 가격을 저장하고 있을 예정 # exchange_price 파라미터가 필요한 이유? -> 해외 거래소에 상장된 코인들은 USDT로 구매 가능한 USDT 페어들을 대상으로 프리미엄을 확인 # 원 달러 가격을 exchange_price 에 저장하고 조회 def get_exchange_accum_trade_price(exchange_accum_trade_price, exchange_price): print(exchange + " get_exchange_accum_trade_price") # 'exchange_price' 에서 USD 를 키로 갖고, 딕셔너리로 'base' 라는 곳에 달러 가격을 저장 # 만약에 달러 가격이 저장되어 있지 않다고 하면(연동되어 있지 않으면), 계산이 불가하기 때문에 0 USD = exchange_price['USD']['base'] if 'USD' in exchange_price else 0 # 'exchange_price' 에서 USDT 를 키로 갖고, 딕셔너리로 'base' 라는 곳에 USDT 가격을 저장 # 만약에 달러 가격이 저장되어 있지 않다고 하면(연동되어 있지 않으면), 에러없이 계산되기 위해서 1 USDT = exchange_price['USDT']['base'] if 'USDT' in exchange_price else 1 # API 주소로 요청을 보내면 바이낸스에 상장된 현물의 24시간 거래누적 대금을 조회하여 'res' 에 저장 res = requests.get('https://api.binance.com/api/v3/ticker/24hr').json() for i in res: if i['symbol'].endswith('USDT'): ticker = i['symbol'].split('USDT')[0] # ticker(코인 정보)가 아직 딕셔너리에 저장되어 있지 않다고 하면 초기화 if ticker not in exchange_accum_trade_price: # ex) exchange_accum_trade_price[symbol] = {'Upbit': None, 'Binance': None, 'Bybit': None, 'OKX': None, 'Bitget': None} exchange_accum_trade_price[ticker] = dict.fromkeys(EXCHANGE_LIST, None) # 누적 거래대금(quoteVolume)에 USD(달러) 가격을 곱해주고, USD와 USDT 간의 가격의 격차를 줄이기 위하여 USDT 를 곱함 exchange_accum_trade_price[ticker][exchange] = round(float(i['quoteVolume']) * USD * USDT / MILLION, 2) # 소켓 연결 후, 실시간 가격까지 저장하는 함수 # exchange_price : 거래소별 가격 데이터를 저장할 딕셔너리 async def connect_websocket(exchange_price): while True: try: util.send_to_telegram('[{}] Creating new connection...'.format(exchange)) util.clear_exchange_price(exchange, exchange_price) start_time = datetime.now() # Connect(연결) async with websockets.connect('wss://stream.binance.com:9443/ws', ping_interval=None, ping_timeout=None) as websocket: # 수신하고 싶은 시세 정보를 나열하는 필드(포맷) params_ticker = [] tickers = get_all_ticker() for idx, ticker in enumerate(tickers): params_ticker.append(ticker) # 한꺼번에 데이터를 보내면 에러가 나기 때문에, 50개씩 나눠서 웹소켓에 요청 if len(params_ticker) > 50 or idx == len(tickers) - 1: subscribe_fmt = { 'method': 'SUBSCRIBE', 'params': params_ticker, 'id': 1 } # 데이터를 json 포맷으로 변환 subscribe_data = json.dumps(subscribe_fmt) # Send(송신)를 통해 웹소켓 서버에서 어떤 데이터를 요청하는지 알 수 있음 await websocket.send(subscribe_data) await asyncio.sleep(1) params_ticker = [] # Receive(수신)를 통해 데이터를 받아올 수 있음 while True: try: data = await websocket.recv() data = json.loads(data) ticker = data['s'].replace('USDT', '') if 's' in data else None if not ticker: # ticker 가 없는 데이터의 경우 저장 불가 continue # print(ticker, data) # 결과 출력 테스트 (※ 프로그램을 실행할 때에는 주석처리 필수) # print(ticker, exchange_price) # 딕셔너리 출력 테스트 (※ 프로그램을 실행할 때에는 주석처리 필수) if ticker not in exchange_price: # 'exchange_price' 에 코인이 존재하지 않다면 거래소 코인 초기화 exchange_price[ticker] = {exchange: None} exchange_price[ticker][exchange] = float(data['c']) * (exchange_price['USD']['base'] if 'USD' in exchange_price else 0) \ * (exchange_price['USDT']['base'] if 'c' in data and 'USDT' in exchange_price else 1) if util.is_need_reset_socket(start_time): # 매일 아침 9시에 소켓 재연결 util.send_to_telegram('[{}] Time to new connection...'.format(exchange)) break # 만약에 에러가 발생하면 에러를 출력하고, 'SOCKET_RETRY_TIME' 만큼 대기를 했다가 break 후, 처음부터 다시 진행 except Exception as e: print(traceback.format_exc()) await asyncio.sleep(SOCKET_RETRY_TIME) break await websocket.close() except Exception as e: print(traceback.format_exc()) await asyncio.sleep(SOCKET_RETRY_TIME) if __name__ == '__main__': exchange_accum_trade_price = {} exchange_price = {'USD': {'base': USD_PRICE}, 'USDT': {'base': 1}} get_exchange_accum_trade_price(exchange_accum_trade_price, exchange_price) print(exchange_accum_trade_price)