Get Item Recommendations

import numpy as np
import pandas as pd

from random_gen import *

def get_rec(utility_matrix, utility_matrix_o, user_list, top_n, uc_assignment=None):
    
    """Returns the top N item cluster recommendations for each user in the user list
    
            Parameters:
                    utility_matrix (numpy.ndarray): Matrix of utilities for each user-item pairing
                    utility_matrix_o (numpy.ndarray): Original utility matrix, before imputation
                    user_list (array-like): List of users
                    uc_assignment (array-like): List containing the cluster assignment of each user
                    top_n (int): Number of item clusters to recommend

            Returns:
                    df_rec (pandas.DataFrame): Table containing the top N item cluster recommendations for each user in the user list
                    
    """
    
    # Class stuff
    #utility_matrix_o = self.utility_matrix.fillna(0).values
    #utility_matrix = self.utility_matrix_preds.values
    
    # Don't recommend items that are already rated
    utility_matrix[np.where(utility_matrix_o != 0)] = -np.inf
    
    # Get top N per user cluster
    cluster_rec = utility_matrix.argsort()[:, -top_n:]

    # Create recommendation table
    df_rec = pd.DataFrame()
    df_rec['user_id'] = user_list
                
    for i in range(top_n):
        df_rec['rank_'+str(i+1)] = np.zeros(df_rec.shape[0])
        for j in range(df_rec.shape[0]):
            if uc_assignment is None:
                df_rec.iloc[j, i+1] = cluster_rec[user_list[j], top_n-i-1]
            else:
                df_rec.iloc[j, i+1] = cluster_rec[uc_assignment.iloc[user_list[j], 0], top_n-i-1]
    
    # look-up tables
    if uc_assignment is None:
        user_id_lookup = self.utility_matrix_preds.index
        item_id_lookup = self.utility_matrix_preds.columns
        for j in range(df_rec.shape[0]):
            df_rec.iloc[j, 0] = user_id_lookup[df_rec.iloc[j, 0].astype('int32')]
            for i in range(top_n):
                df_rec.iloc[j, i+1] = item_id_lookup[df_rec.iloc[j, i+1].astype('int32')]
    
    return df_rec

Example

n_user = 100
sample_size = 10
n_user_cluster = 5
n_item_cluster = 5
top_n = 3
random_seed = 1

user_id_list = list(range(n_user))
user_list = random_user_list(n_user, sample_size, random_seed)
uc_assignment = random_user_cluster(n_user, n_user_cluster, random_seed)
utility_matrix_o, utility_matrix = random_utility_matrix(n_user_cluster, n_item_cluster, random_seed)
user_list
array([80, 84, 33, 81, 93, 17, 36, 82, 69, 65])
uc_assignment
array([3, 4, 0, 1, 3, 0, 0, 1, 4, 4, 1, 2, 4, 2, 4, 3, 4, 2, 4, 2, 4, 1,
       1, 0, 1, 1, 1, 1, 0, 4, 1, 0, 0, 3, 2, 1, 0, 3, 1, 1, 3, 4, 0, 1,
       3, 4, 2, 4, 0, 3, 1, 2, 0, 4, 1, 2, 2, 1, 0, 1, 3, 4, 3, 1, 3, 0,
       0, 2, 2, 1, 3, 4, 2, 0, 0, 1, 1, 3, 0, 0, 4, 2, 4, 3, 3, 0, 3, 4,
       3, 4, 4, 4, 1, 0, 4, 2, 0, 2, 4, 1])
uc_assignment
array([3, 4, 0, 1, 3, 0, 0, 1, 4, 4, 1, 2, 4, 2, 4, 3, 4, 2, 4, 2, 4, 1,
       1, 0, 1, 1, 1, 1, 0, 4, 1, 0, 0, 3, 2, 1, 0, 3, 1, 1, 3, 4, 0, 1,
       3, 4, 2, 4, 0, 3, 1, 2, 0, 4, 1, 2, 2, 1, 0, 1, 3, 4, 3, 1, 3, 0,
       0, 2, 2, 1, 3, 4, 2, 0, 0, 1, 1, 3, 0, 0, 4, 2, 4, 3, 3, 0, 3, 4,
       3, 4, 4, 4, 1, 0, 4, 2, 0, 2, 4, 1])
uc_assignment[user_list]
array([4, 3, 3, 2, 0, 2, 0, 4, 1, 0])
utility_matrix_o
array([[0., 0., 1., 0., 0.],
       [0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 1.],
       [1., 0., 0., 0., 0.],
       [0., 0., 0., 1., 0.]])
utility_matrix
array([[0.3732, 0.3739, 1.    , 0.5149, 0.1519],
       [0.0812, 1.    , 0.6708, 0.0519, 0.6888],
       [0.6676, 0.1352, 0.0962, 0.1855, 1.    ],
       [1.    , 0.1501, 0.0326, 0.5469, 0.3875],
       [0.8347, 0.9869, 0.871 , 1.    , 0.8348]])
df_rec = get_rec(utility_matrix, utility_matrix_o, user_list, top_n, pd.DataFrame(uc_assignment))
df_rec
user_id rank_1 rank_2 rank_3
0 80 1.0 2.0 4.0
1 84 3.0 4.0 1.0
2 33 3.0 4.0 1.0
3 81 0.0 3.0 1.0
4 93 3.0 1.0 0.0
5 17 0.0 3.0 1.0
6 36 3.0 1.0 0.0
7 82 1.0 2.0 4.0
8 69 4.0 2.0 0.0
9 65 3.0 1.0 0.0

Unit Test

import unittest

class TestGetRec(unittest.TestCase):
    
    def test_1(self):
        
        # Set-up
        n_user = 100
        sample_size = 10
        n_user_cluster = 5
        n_item_cluster = 5
        random_seed = 1
        top_n = 3

        user_id_list = list(range(n_user))
        user_list = random_user_list(n_user, sample_size, random_seed)
        uc_assignment = random_user_cluster(n_user, n_user_cluster, random_seed)
        utility_matrix_o, utility_matrix = random_utility_matrix(n_user_cluster, n_item_cluster, random_seed)

        df_rec = get_rec(utility_matrix, utility_matrix_o, user_list, top_n, pd.DataFrame(uc_assignment))
        
        test_case = np.array([
            [80.,  1.,  2.,  4.],
            [84.,  3.,  4.,  1.],
            [33.,  3.,  4.,  1.],
            [81.,  0.,  3.,  1.],
            [93.,  3.,  1.,  0.],
            [17.,  0.,  3.,  1.],
            [36.,  3.,  1.,  0.],
            [82.,  1.,  2.,  4.],
            [69.,  4.,  2.,  0.],
            [65.,  3.,  1.,  0.]
        ])
        
        self.assertEqual(df_rec.to_numpy().tolist(), test_case.tolist())
        
    def test_2(self):
        
        # Set-up
        n_user = 100
        sample_size = 10
        n_user_cluster = 5
        n_item_cluster = 5
        random_seed = 2
        top_n = 3

        user_id_list = list(range(n_user))
        user_list = random_user_list(n_user, sample_size, random_seed)
        uc_assignment = random_user_cluster(n_user, n_user_cluster, random_seed)
        utility_matrix_o, utility_matrix = random_utility_matrix(n_user_cluster, n_item_cluster, random_seed)

        df_rec = get_rec(utility_matrix, utility_matrix_o, user_list, top_n, pd.DataFrame(uc_assignment))
        
        test_case = np.array([
            [80.,  1.,  2.,  4.],
            [84.,  3.,  4.,  1.],
            [33.,  3.,  4.,  1.],
            [81.,  0.,  3.,  1.],
            [93.,  3.,  1.,  0.],
            [17.,  0.,  3.,  1.],
            [36.,  3.,  1.,  0.],
            [82.,  1.,  2.,  4.],
            [69.,  4.,  2.,  0.],
            [65.,  3.,  1.,  0.]
        ])
        
        self.assertEqual(df_rec.to_numpy().tolist(), test_case.tolist())
        
unittest.main(argv=[''], verbosity=2, exit=False)
test_1 (__main__.TestGetRec) ... ok
test_2 (__main__.TestGetRec) ... FAIL

======================================================================
FAIL: test_2 (__main__.TestGetRec)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-14-3e80e6adab87>", line 67, in test_2
    self.assertEqual(df_rec.to_numpy().tolist(), test_case.tolist())
AssertionError: Lists differ: [[83.0, 2.0, 1.0, 4.0], [30.0, 4.0, 1.0, 2.0[180 chars]4.0]] != [[80.0, 1.0, 2.0, 4.0], [84.0, 3.0, 4.0, 1.0[181 chars]0.0]]

First differing element 0:
[83.0, 2.0, 1.0, 4.0]
[80.0, 1.0, 2.0, 4.0]

Diff is 802 characters long. Set self.maxDiff to None to see it.

----------------------------------------------------------------------
Ran 2 tests in 0.020s

FAILED (failures=1)
<unittest.main.TestProgram at 0x12de02a9fa0>