{ "cells": [ { "cell_type": "markdown", "id": "fcb88754", "metadata": {}, "source": [ "# Sampling Data in a Stream\n", "\n", "We start with the general problem of sampling data. We want to be able to select a subset of the stream to make calculations easier, but also have the results be statistically representative of the stream as a whole. One example might be a search engine wanting to answer the question of \"What fraction of the typical user’s queries were repeated over the past month?\". Assume also that due to space constraints, we wish to store only $\\frac{1}{10}$th of stream elements.\n", "\n", "An easy but incorrect approach would be to just generate a random number from 0 to 9 for each incoming data, and only consider data when the random number is equal to 0. However this will give us the wrong answer as for each user we would be drastically reducing the fraction of repeated queries too. If a user has $s$ queries which were done exactly once, and $d$ queries which were done exactly twice, and no queries were done more than 2 times. The correct answer for the fraction of repeated queries for this user is $\\frac{d}{s+d}$. However using the sampling method just detailed, the fraction we will get is $\\frac{d}{10s + 19d}$.\n", "\n", "The right way would be to only consider $\\frac{1}{10}$th of all users, but consider all queries for those randomly selected users. Assume further that the possible list of users is so large that isn't feasible to store which users should have their queries tracked and which users shouldn't. We can instead use a hash function that maps the user to 1 of 10 buckets. If the user hashes to the first bucket, then this user is someone whose queries we would want to track. Note that we do not actually need to save the users into the buckets, we just need to know at runtime which bucket the user goes to using the fast hash function. This saves us from having to keep track of all users in the universal set. " ] }, { "cell_type": "code", "execution_count": null, "id": "32791a83", "metadata": {}, "outputs": [], "source": [ "n = 0\n", "data_list = []\n", "\n", "while n < 1000000:\n", " userId = np.random.choice(userIds)\n", " word = np.random.choice(words)\n", " \n", " data_list.append((userId, word))\n", " n += 1\n", " \n", "data_iterator = iter(data_list)" ] }, { "cell_type": "code", "execution_count": 7, "id": "4dd40469", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Iterations: 100000 Average Repeat Per User: 8.48824378236143e-05\n", "Iterations: 200000 Average Repeat Per User: 0.0004026243221023727\n", "Iterations: 300000 Average Repeat Per User: 0.0003966167804915212\n", "Iterations: 400000 Average Repeat Per User: 0.0004724339706863245\n", "Iterations: 500000 Average Repeat Per User: 0.00086515906940257\n", "Iterations: 600000 Average Repeat Per User: 0.0010044780481139246\n", "Iterations: 700000 Average Repeat Per User: 0.0012795931598440701\n", "Iterations: 800000 Average Repeat Per User: 0.0015721791194151934\n", "Iterations: 900000 Average Repeat Per User: 0.0018303686818008796\n", "Iterations: 1000000 Average Repeat Per User: 0.002072822397857278\n" ] } ], "source": [ "samples = {} \n", "n = 0\n", "\n", "for val in data_iterator:\n", " userId = val[0]\n", " word = val[1]\n", " \n", " #hash to 10 buckets. Process only if hash points to first bucket\n", " if hashInt(userId, 10) == 0:\n", " if userId not in samples: \n", " samples[userId] = {}\n", " \n", " if word not in samples[userId]:\n", " samples[userId][word] = 0\n", " \n", " samples[userId][word] += 1\n", " \n", " #Print fraction of repeated queries every 100000 elements\n", " n += 1\n", " if n % 100000 == 0:\n", " fractionRepeats = 0\n", " for userId in samples:\n", " repeats = 0\n", " \n", " for word in samples[userId]:\n", " if samples[userId][word] > 1:\n", " repeats += 1\n", " \n", " fractionRepeats += repeats / len(samples[userId])\n", "\n", " fractionRepeats /= len(samples)\n", " print(\"Iterations:\", n, 'Average Repeat Per User:', fractionRepeats)" ] }, { "cell_type": "markdown", "id": "1f700aaf", "metadata": {}, "source": [ "## Fixed Size Samples\n", "\n", "For infinite or very large streams, our set of samples will also eventually become too large for our working storage. For cases like this we may want to fix a maximum number of samples we will keep, and randomly replace older samples with newer ones over time. One technique to do this will be to do **Reservoir Sampling**.\n", "\n", "Let $s$ be the maximum number of samples we want to keep, and $n$ be the number of samples we have been tracking from the start. \n", "\n", "1. While $n <= s$, keep sampling normally.\n", "2. A new element we want to samples arrives such that $n > s$. With probability $\\frac{s}{n}$, keep the new sample. Otherwise discard it.\n", "3. If we're keeping the new sample, discard one of the older samples at random.\n", "\n", "After $n$ elements, the sample contains each element seen so far with probability $\\frac{s}{n}$. For our user query sample above, it may still be better to think of replacing users instead of their individual queries.\n", "\n", "We implement the fixed size sampling algorithm detailed above for $s=20$ while going through our *words* list." ] }, { "cell_type": "code", "execution_count": 8, "id": "cc60a033", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Samples: 20\n", "['wrencher', 'wonderfulness', 'Mantuan', 'dimorphic', 'disturbance', 'Sparassis', 'unstow', 'peastone', 'downline', 'manneristically', 'spelterman', 'darned', 'calathiform', 'oblongly', 'excavationist', 'oneirocrit', 'subloreal', 'unspiritedly', 'binomially', 'thioarsenic']\n" ] } ], "source": [ "samples = []\n", "maxSamples = 20 #s\n", "samplesTracked = 0 #n\n", "\n", "for w in words:\n", " samplesTracked += 1\n", " \n", " if len(samples) < maxSamples:\n", " samples.append(w)\n", " else:\n", " if random.random() < (maxSamples / samplesTracked):\n", " randIndex = random.randint(0, maxSamples-1)\n", " samples[randIndex] = w\n", "\n", "print(\"Samples:\", len(samples))\n", "print(samples)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.12" } }, "nbformat": 4, "nbformat_minor": 5 }