Spaces:
Runtime error
Runtime error
| from collections import defaultdict | |
| import numpy as np | |
| import scipy as sp | |
| import calculator | |
| import data_retriever | |
| import streamlit as st | |
| # plot the trend of the market as a candlestick graph. | |
| def sector_trends(symbol, weeks_back): | |
| # 1. get all peers | |
| peers = data_retriever.get_peers(symbol) | |
| # 2. get data for each peer | |
| peers_stock_data = defaultdict(list) | |
| dates = [] | |
| for peer in peers: | |
| peer_data = data_retriever.get_current_stock_data(peer, weeks_back) | |
| if len(peer_data) == 0: | |
| continue | |
| if not any(dates): | |
| dates = peer_data.index.format() | |
| peers_stock_data[peer] = peer_data | |
| # 3. normalize all data (get min->max value, then set each value to (X-min)/(max-min) | |
| peers_stock_data_normalized = defaultdict(dict) | |
| indicator_stock_data_normalized_peer_sum = defaultdict(list) | |
| for peer, peer_stock_data in peers_stock_data.items(): | |
| peer_stock_data_normalized = defaultdict(list) | |
| for indicator in peer_stock_data: | |
| indicator_normalized = peer_stock_data_normalized[indicator] | |
| indicator_stock_data_normalized_sum = indicator_stock_data_normalized_peer_sum[indicator] | |
| indicator_values = peer_stock_data[indicator] | |
| min_val = np.min(indicator_values) | |
| max_val = np.max(indicator_values) | |
| delta = max_val - min_val | |
| value_idx = 0 | |
| for value in indicator_values: | |
| normalized = (value - min_val)/delta | |
| indicator_normalized.append(normalized) | |
| while len(indicator_stock_data_normalized_sum) <= value_idx: | |
| indicator_stock_data_normalized_sum.append(0) | |
| indicator_stock_data_normalized_sum[value_idx] += normalized | |
| value_idx += 1 | |
| peers_stock_data_normalized[peer] = peer_stock_data_normalized | |
| # 4. get the average value for each indicator [open, close, high, low] for each time-step. | |
| peer_count = len(peers) | |
| indicator_stock_data_normalized_peer_avg = defaultdict(list) | |
| for indicator, indicator_sum_values in indicator_stock_data_normalized_peer_sum.items(): | |
| indicator_avg_values = indicator_stock_data_normalized_peer_avg[indicator] | |
| for sum_value in indicator_sum_values: | |
| indicator_avg_values.append(sum_value/peer_count) | |
| # 5. plot the resulting normalized-averaged-indicators in a candlestick chart. | |
| close_data = defaultdict(list) | |
| for peer, peer_data_normalized in peers_stock_data_normalized.items(): | |
| close_data[peer] = peer_data_normalized['Close'] | |
| relative_close_data = defaultdict(list) | |
| for peer in peers_stock_data_normalized: | |
| relative_close_data[peer] = [a - b for a, b in zip(close_data[peer], indicator_stock_data_normalized_peer_avg['Close'])] | |
| return dates, close_data, relative_close_data, indicator_stock_data_normalized_peer_avg | |
| def trend_line(date_indices, data_list, min_trend_size=7, step_size=1, sigma_multiplier=1): | |
| trend_dates, trend_values = [], [] | |
| if not any(date_indices) or not any(data_list): | |
| return trend_dates, trend_values | |
| np_dates = np.array([ts.timestamp() for ts in date_indices]) | |
| dates = date_indices.format() | |
| start_index = 0 | |
| index = min_trend_size | |
| while index < len(data_list): | |
| np_dates_subset = np_dates[start_index:index] | |
| np_values_subset = data_list[start_index:index] | |
| # for the value range, calculate linear_regression and standard deviation | |
| m, c = calculator.linear_regression_line(np_dates_subset, np_values_subset) | |
| predicted_points = m * np_dates_subset + c | |
| relative_mean = np.mean(np.abs(predicted_points - np_values_subset)) # TODO: use Welford's algorithm | |
| # for the next datapoint(s), calculate the next value in that trend, | |
| # and check if it is n*trend_sigma away from the current trend line | |
| next_index = index + step_size | |
| if next_index >= len(data_list): | |
| break | |
| x = np_dates[next_index:next_index+1] | |
| y = data_list[next_index:next_index+1] | |
| expected_y = m*x + c | |
| dev = np.mean(np.abs(y - expected_y)) | |
| if dev > relative_mean * sigma_multiplier: | |
| # store the current date and value as a trend changer | |
| trend_dates.append(dates[index]) | |
| trend_values.append(data_list[index]) | |
| # reset the calculation for the next data | |
| start_index = next_index - min_trend_size | |
| index = next_index | |
| return trend_dates, trend_values | |
| def vwap(symbol_price_data): | |
| typical_price = (symbol_price_data['High'] + symbol_price_data['Low'] + symbol_price_data['Close']) / 3.0 | |
| volume = symbol_price_data['Volume'] | |
| vwap_values = (typical_price * volume).cumsum() / volume.cumsum() | |
| return vwap_values | |
| def sma(prices, period=20): | |
| values = [] | |
| for index in range(len(prices) - period): | |
| values.append(np.mean(prices[index:index + period])) | |
| return values | |
| def bollinger_bands(dates, symbol_price_data, n_days_deviation=20, deviation_multiplier=2): | |
| bollinger_dates, bollinger_high, bollinger_low = [], [], [] | |
| closes = symbol_price_data['Close'] | |
| for index in range(len(closes) - n_days_deviation): | |
| bollinger_dates.append(dates[index + n_days_deviation]) | |
| closes_subset = closes[index:index + n_days_deviation] | |
| sma = np.sum(closes_subset)/len(closes_subset) | |
| sigma = deviation_multiplier * np.sqrt(np.sum(np.power(closes_subset - sma, 2))/len(closes_subset)) | |
| bollinger_high.append(sma + sigma) | |
| bollinger_low.append(sma - sigma) | |
| return bollinger_dates, bollinger_low, bollinger_high | |
| def support_lines(): | |
| a = 1 | |
| # a linear regression over a period | |
| def linear_regression_trend(prices, period=32): | |
| np_dates = np.array([ts.timestamp() for ts in prices.index]) | |
| np_values = np.array(prices) | |
| value_count = len(prices) | |
| trend = [] | |
| for index in range(value_count-period): | |
| np_dates_subset = np_dates[index:index+period+1] | |
| np_values_subset = np_values[index:index+period+1] | |
| low, high = calculator.linear_regression(np_dates_subset, np_values_subset) | |
| trend.append(high) | |
| return trend | |
| def bids(dates, symbol_prices, period=14): | |
| close_values = symbol_prices['Close'] | |
| trend = linear_regression_trend(close_values, period=period) | |
| sma_values = sma(close_values, period=period) | |
| # get intercepts | |
| dates_subset = dates[period:] | |
| close_subset = close_values[period:] | |
| sma_intercept_indices = calculator.intercepts(sma_values, close_subset) | |
| trend_intercept_indices = calculator.intercepts(trend, close_subset) | |
| sma_intercept_dates = [dates_subset[index] for index in sma_intercept_indices] | |
| bids = [] | |
| for index in sma_intercept_indices: | |
| is_up = sma_values[index] > close_subset[index] | |
| if is_up: | |
| bids.append(10) | |
| else: | |
| bids.append(-10) | |
| return sma_intercept_dates, bids | |