Moving Average Optimisation#

New user? The best way to learn SigTech is to follow the steps in our user guide.

Restricted data: You will only have access to the data used in this notebook if your organisation has specifically purchased it. To check your current data access, view Data. If you would like to access more data, please contact sales@sigtech.com.

Changes will not be saved: Edits made to SigTech’s example notebooks like this will only persist for the duration of your session. When you restart the research environment, this notebook will have been restored to its original state, and any changes you have made will have been lost. To make permanent changes, copy and paste the content to a new notebook in one of your workspaces.

Introduction#

The purpose of this notebook is to show how to perform an optimisation on an investment strategy where the signal is defined by a cross-over / cross-under moving average, where the parameter that is being optimised is the backward looking window in the moving average. ## Environment This section will import relevant internal and external libraries, as well as setting up the platform environment.

[ ]:
import sigtech.framework as sig
from sigtech.framework.analytics.performance.metrics import sharpe_ratio
from sigtech.framework.analytics.performance.performance_report import View
from sigtech.framework.schedules import SchedulePeriodic
from sigtech.framework.signal.library.allocation import normalize_long_short, identity

import numpy as np
import pandas as pd
import datetime as dtm
import seaborn as sns
import matplotlib.pyplot as plt

from scipy import stats
from uuid import uuid4
from itertools import product

sns.set(rc = {'figure.figsize': (18, 6)})

Universe#

[ ]:
START_DATE = dtm.date(2010, 1, 4)
END_DATE = dtm.date(2021, 5, 14)
ANNUALISATION_FACTOR = 252
CURRENCY = 'USD'
REBALANCE_CALENDAR = 'SOM'
FITTING_WINDOW = 252
SMA_PERIODS = [5, 10, 15]
LMA_PERIODS = [20, 40, 60]
[ ]:
env = sig.init(env_date=END_DATE)
env.set_shared_memory(on=True)
env[sig.config.IGNORE_T_COSTS] = False
env[sig.config.T_COST_OVERRIDES] = {
'Future': ('fixed_relative_cost', {'spread' : 1 / 10000}),
}
[ ]:
CONTRACT_SECTORS = {
    'CD': 'CURNCY',
    'EC': 'CURNCY',
    'SF': 'CURNCY',
    'NV': 'COMDTY',
}
[ ]:
def get_rolling_futures_strategy(code):
    print(f'Creating Strategy for: {code}')
    strategy = sig.RollingFutureStrategy(
        currency=CURRENCY,
        start_date=START_DATE,
        contract_code=code,
        contract_sector=CONTRACT_SECTORS.get(code),
        rolling_rule='front',
        front_offset='-6,-5')
    return strategy
[ ]:
rf_strategies = sig.calc_history([get_rolling_futures_strategy(code) for code in CONTRACT_SECTORS])
INSTRUMENT_MAPPING = {s.name[2].split()[1]: s.name[2] for s in rf_strategies}
[ ]:
histories = pd.concat({s: sig.obj.get(v).history() for s, v in INSTRUMENT_MAPPING.items()}, axis=1).dropna()

Strategy#

[ ]:
def get_conviction_level(difs):
    """
    Return dataframe of raw conviction percentiles from dif inputs
    """
    idx = difs.index[FITTING_WINDOW-1:]
    results = {}
    for c in difs:
        temp = {}
        dif = difs[c]

        for d, v in dif.loc[dif.index.isin(idx)].iteritems():
            df = dif.loc[:d].tail(FITTING_WINDOW)
            temp[d] = stats.percentileofscore(df, v)
        results[c] = pd.Series(temp)

    return np.minimum(4, pd.DataFrame(results) // 20)


def get_conviction_signal(signal, number_positions=5, size_factor=0.25):
    """
    Alter signal to place no more than number_positions on a given day
    Resize and rescale
    """
    s = pd.DataFrame()

    for date in signal.index:
        signals_df = signal.loc[date: date].copy()
        ranked_signals_df = signals_df.rank(axis = 1, method='first', ascending=False)
        signals_df[ranked_signals_df > number_positions] = 0
        s = pd.concat([s, signals_df])

    new_signal = s.loc[~s.index.duplicated(keep='first')]
    new_signal = new_signal.divide(new_signal.sum(axis=1), axis=0)
    return size_factor * new_signal
[ ]:
def get_ma(ts, window):
    return ts.rolling(window=window).mean().dropna()


def get_strategy_from_signal(signal,
                             start_date=None,
                             end_date=dtm.date.max,
                             currency=CURRENCY,
                             rebal_freq='1W-Fri',
                             allocation_function=normalize_long_short,
                             ticker=None):
    """
    Convert signal dataframe into investment strategy
    """
    if start_date is None:
        start_date = signal.first_valid_index()

    strategy = sig.SignalStrategy(
        currency=currency,
        start_date=start_date,
        end_date=end_date,
        signal_name=sig.signal_library.from_ts(signal).name,
        rebalance_frequency=rebal_freq,
        allocation_function=allocation_function,
        include_trading_costs=True,
        instrument_mapping=INSTRUMENT_MAPPING,
        convert_long_short_weight=False,
        ticker = f'{ticker} {str(uuid4())[:5]}'
    )
    _ = strategy.history()

    return strategy
[ ]:
def get_ma_dfs(histories, sma, lma):
    """
    Return dataframes of SMA and LMA for all rolling futures strategies
    """
    print(sma, lma)
    sma_df = pd.concat({h: get_ma(histories[h], window=sma) for h in histories}, axis=1)
    lma_df = pd.concat({h: get_ma(histories[h], window=lma) for h in histories}, axis=1)
    sma_df = sma_df.loc[sma_df.index.isin(lma_df.index)]

    if sma not in SMAs:
        SMAs[sma] = sma_df
    if lma not in LMAs:
        LMAs[sma] = lma_df

    return sma_df, lma_df
[ ]:
SMAs = {}
LMAs = {}
SIGNALS = {}

Portfolio Construction#

[ ]:
h = {}

for sma, lma in product(SMA_PERIODS, LMA_PERIODS):
    sma_df, lma_df = get_ma_dfs(histories, sma, lma)

    crossover = 2 * (sma_df > lma_df) - 1
    difs = abs(lma_df - sma_df).dropna()

    conviction_level = get_conviction_level(difs)
    conviction_signal = get_conviction_signal(conviction_level)
    crossover = crossover.loc[crossover.index.isin(conviction_signal.index)]

    signal = crossover * conviction_signal
    SIGNALS[(sma, lma)] = signal

    name = f'SMA_{sma}_LMA_{lma}_FIT'
    strategy = get_strategy_from_signal(signal, ticker=name, allocation_function=identity)
    strategy.history().name = None
    h[(sma, lma)] = strategy.history()

h = pd.DataFrame(h).dropna()
h.columns = list(h.columns)
[ ]:
FITTING_SCHEDULE = SchedulePeriodic(
    start_date=START_DATE,
    end_date=END_DATE,
    holidays='NYSE(T) CALENDAR',
    frequency=REBALANCE_CALENDAR,
    bdc=sig.calendar.BDC_FOLLOWING
).all_data_dates()

FITTING_SCHEDULE = [x for x in FITTING_SCHEDULE
                    if pd.Timestamp(x) in h.iloc[FITTING_WINDOW-1:].index]
[ ]:
def get_optimal_parameters(h, method=sharpe_ratio):
    """
    Compute series of optimal params based on FITTING_SCHEDULE and FITTING_WINDOW
    """
    optimal_params = {}
    for d in FITTING_SCHEDULE:
        optimal_params[d] = h.loc[:d].tail(FITTING_WINDOW).apply(sharpe_ratio).idxmax()
    return pd.Series(optimal_params)
[ ]:
def get_combined_signal(optimal_params):
    s = pd.DataFrame()
    last_date = FITTING_SCHEDULE[0]

    for date, params in optimal_windows[1:].iteritems():
        s = pd.concat([s, SIGNALS[params].loc[last_date:date]], axis=0)
        last_date = date

    s = s.loc[~s.index.duplicated(keep='last')]
    return s
[ ]:
optimal_windows = get_optimal_parameters(h)
combined_signal = get_combined_signal(optimal_windows)
fit_strategy = get_strategy_from_signal(combined_signal, rebal_freq='1BD', allocation_function=identity, ticker='DIRECTIONAL')
fit_strategy_dn = get_strategy_from_signal(combined_signal, rebal_freq='1BD', ticker='DOLLAR_NEUTRAL')
[ ]:
fit_strategy = get_strategy_from_signal(combined_signal, rebal_freq='1BD', allocation_function=identity, ticker='DIRECTIONAL')

Performance#

[ ]:
VIEWS = [View.SUMMARY_SINGLE, View.ROLLING_PLOTS, View.HISTORY, View.MONTHLY_STATS_HEATMAP]
CASH = sig.CashIndex.from_currency(CURRENCY).history()
[ ]:
report = sig.PerformanceReport(fit_strategy, CASH, views=VIEWS).report()
[ ]:
h['FIT'] = fit_strategy.history()
[ ]:
reindexed = h.dropna().copy()
for c in reindexed:
    reindexed[c] = (reindexed[c] / reindexed[c].iloc[0]) * 1000
[ ]:
reindexed.plot();
plt.legend(loc=2);