以上
本節は、「 NumPyroによるベイズモデリング入門【線形回帰編】 」を引用している。
import pandas as pd import numpy as np import seaborn as sns import matplotlib.pyplot as plt import jax import jax.numpy as jnp import numpyro sns.set_style("darkgrid")
$\Large \displaystyle a \sim Normal(0, 100)$
$\Large \displaystyle b \sim Normal(0, 100)$
$\Large \displaystyle \sigma \sim LogNormal(0, 10)$
$\Large \displaystyle y_i \sim Normal(ax_i+b, \sigma)$
def model(x, y, N=100): ## 回帰係数 a、切片項 b の事前分布に平均0, 分散100の正規分布を置きます。 a = numpyro.sample("a", numpyro.distributions.Normal(loc=jnp.array(0.), scale=jnp.array(100.))) b = numpyro.sample("b", numpyro.distributions.Normal(loc=jnp.array(0.), scale=jnp.array(100.))) ## 分散パラメータ sigma の事前分布に平均0, 分散10の対数正規分布を置きます。 sigma = numpyro.sample("sigma", numpyro.distributions.LogNormal(0, 10)) ## 観測についての確率モデルとして、正規分布を置きます。 ## yは実際に観測されているものなので、データyと観測モデルを紐づけるために、obs=yと設定しておきます with numpyro.plate("data", N): numpyro.sample("obs",numpyro.distributions.Normal(a*x + b, sigma), obs=y)
$\Large \displaystyle y_i \sim Normal(-5x+3, 1.0)$
$\Large \displaystyle y_i \sim Normal(ax_i+b, \sigma)$
以上
この記事は「 Numpyroでベイズ統計モデリング~単回帰モデル~ 」を参照しています。
sales,temperature 41.68,13.7 110.99,24 65.32,21.5 72.64,13.4 76.54,28.9 62.76,28.9 :
file_bear_sales_2 = pd.read_csv("3-2-1-beer-sales-2.csv") file_bear_sales_2.head() plt.figure(figsize=(10,5)) sns.scatterplot(x="temperature",y="sales",data=file_bear_sales_2) plt.show()
$\Large \displaystyle sales_i=Normal(Intercept+beta*temperature_i,\sigma^2)$
def model( sales, temperature ): Intercept = numpyro.sample("Intercept",dist.Normal(0,100)) beta = numpyro.sample("beta",dist.Normal(0,100)) sigma = numpyro.sample("sigma",dist.HalfNormal(100)) numpyro.sample("sales",dist.Normal(Intercept + beta * temperature, sigma),obs = sales)
data_dict = { "temperature":file_bear_sales_2["temperature"].values, "sales":file_bear_sales_2["sales"].values } kernel = NUTS(model) sample_kwargs = dict( sampler=kernel, num_warmup=2000, num_samples=2000, num_chains=4, chain_method="parallel" ) mcmc = MCMC(**sample_kwargs) mcmc.run(random.PRNGKey(0), **data_dict) mcmc.print_summary()
この記事は「 Numpyroでベイズ統計モデリング~事後予測分布~ 」を参照しています。
def model( N, sales, temperature ): Intercept = numpyro.sample("Intercept",dist.Normal(0,100)) beta = numpyro.sample("beta",dist.Normal(0,100)) sigma = numpyro.sample("sigma",dist.HalfNormal(100)) with numpyro.plate("N",N): numpyro.sample("sales",dist.Normal(Intercept + beta * temperature, sigma),obs = sales)
data_dict = { "N":len(file_bear_sales_2), "temperature":file_bear_sales_2["temperature"].values, "sales":file_bear_sales_2["sales"].values } kernel = NUTS(model) sample_kwargs = dict( sampler=kernel, num_warmup=2000, num_samples=2000, num_chains=4, chain_method="parallel" ) mcmc = MCMC(**sample_kwargs) mcmc.run(random.PRNGKey(0), **data_dict) mcmc.print_summary()
mcmc_samples=mcmc.get_samples() predictive = numpyro.infer.Predictive(model, mcmc_samples) temperature_pred = jnp.arange(11,31)
ppc_samples = predictive(random.PRNGKey(0),N = len(temperature_pred), temperature = temperature_pred, sales=None)
idata_ppc = az.from_numpyro(mcmc, posterior_predictive=ppc_samples)
az.plot_forest(idata_ppc.posterior_predictive["sales"], var_names=["sales"], hdi_prob=0.95, combined=True, colors='b'); plt.show()
az.plot_forest(idata_ppc.posterior_predictive["sales"][:,:,[0,19]], kind="ridgeplot", var_names=["sales"], hdi_prob=0.95, ridgeplot_overlap=0.9, ridgeplot_truncate = False, ridgeplot_quantiles=[.5], combined=True, ridgeplot_alpha=0.5, figsize=(8,5), colors='b'); plt.show()
sales_pred = idata_ppc.posterior_predictive['sales'] az.plot_hdi(temperature_pred, sales_pred,fill_kwargs={'alpha': 0.3}) plt.plot(temperature_pred, sales_pred.mean(axis=0).mean(axis=0),color="orange") sns.scatterplot(x=file_bear_sales_2["temperature"], y=file_bear_sales_2["sales"],s=50,color="gray") plt.show()
以上
本節は、「 NumPyro:ガウス過程 」を引用している。
import os import jax import jax.numpy as jnp from jax import vmap from jax import random import matplotlib.pyplot as plt import numpy as np import pandas as pd import seaborn as sns import numpyro from numpyro.diagnostics import hpdi import numpyro.distributions as dist import numpyro.distributions.constraints as constraints from numpyro.infer import MCMC, NUTS from numpyro.infer import Predictive from numpyro.infer.util import initialize_model from numpyro.infer import ( init_to_feasible, init_to_median, init_to_sample, init_to_uniform, init_to_value, ) import arviz as az az.style.use("arviz-darkgrid") assert numpyro.__version__.startswith("0.11.0") numpyro.enable_x64(True) numpyro.set_platform("cpu") numpyro.set_host_device_count(1)
def RBF(X, Z, var, length, noise, jitter=1.0e-6, include_noise=True): # https://docs.pyro.ai/en/stable/_modules/pyro/contrib/gp/kernels/isotropic.html#RBF X = jnp.asarray(X) Z = jnp.asarray(Z) scaled_X = X / length scaled_Z = Z / length X2 = (scaled_X**2).sum(axis=1, keepdims=True) Z2 = (scaled_Z**2).sum(axis=1, keepdims=True) XZ = jnp.matmul(scaled_X, scaled_Z.T) r2 = X2 - 2 * XZ + Z2.T r2 = jnp.clip(r2, a_min=0) k = var * jnp.exp(-0.5 * r2) if include_noise: k += (noise + jitter) * jnp.eye(X.shape[0]) return k
def model_prior(X): K = RBF(X, X, var=1, length=0.2, noise=0.) numpyro.sample("y", dist.MultivariateNormal(loc=jnp.zeros(X.shape[0]), covariance_matrix=K)) x_sim = np.linspace(-1, 1, 100) x_sim = x_sim[:, None] rng_key = random.PRNGKey(0) prior_predictive = Predictive(model_prior, num_samples=20) prior_predictions = prior_predictive(rng_key, X=x_sim)["y"] plt.figure(figsize=(6, 4)) for i in range(20): plt.plot(x_sim[:], prior_predictions[i,:])
# create artificial regression dataset def get_data(N=30, sigma_obs=0.15, N_test=400): np.random.seed(0) X = jnp.linspace(-1, 1, N) Y = X + 0.2 * jnp.power(X, 3.0) + 0.5 * jnp.power(0.5 + X, 2.0) * jnp.sin(4.0 * X) Y += sigma_obs * np.random.randn(N) Y -= jnp.mean(Y) Y /= jnp.std(Y) assert X.shape == (N,) assert Y.shape == (N,) X_test = jnp.linspace(-1.3, 1.3, N_test) return X, Y, X_test X_, Y, X_test_ = get_data(N=25) X = X_[:, jnp.newaxis] X_test = X_test_[:, jnp.newaxis]
def model_marginal_likelihood_GP(X, Y): var = numpyro.sample("kernel_var", dist.LogNormal(0.0, 10.0)) length = numpyro.sample("kernel_length", dist.LogNormal(0.0, 10.0)) noise = numpyro.sample("kernel_noise", dist.LogNormal(0.0, 10.0)) K = RBF(X, X, var, length, noise) numpyro.sample( "Y", dist.MultivariateNormal(loc=jnp.zeros(X.shape[0]), covariance_matrix=K), obs=Y, ) rng_key, rng_key_predict = random.split(random.PRNGKey(0)) kernel = NUTS(model_marginal_likelihood_GP, init_strategy=init_to_feasible) mcmc = MCMC( kernel, num_warmup=1000, num_samples=1000, num_chains=1, thinning=2, ) mcmc.run(rng_key, X, Y) mcmc.print_summary() samples = mcmc.get_samples()
mean std median 5.0% 95.0% n_eff r_hat kernel_length 0.70 0.23 0.66 0.37 1.00 322.50 1.01 kernel_noise 0.06 0.02 0.06 0.03 0.09 423.82 1.00 kernel_var 2.57 4.00 1.36 0.29 5.31 328.78 1.00 Number of divergences: 0
def predict(rng_key, X, Y, X_test, var, length, noise): # compute kernels between train and test data, etc. k_pp = RBF(X_test, X_test, var, length, noise, include_noise=True) k_pX = RBF(X_test, X, var, length, noise, include_noise=False) k_XX = RBF(X, X, var, length, noise, include_noise=True) K_xx_inv = jnp.linalg.inv(k_XX) K = k_pp - jnp.matmul(k_pX, jnp.matmul(K_xx_inv, jnp.transpose(k_pX))) sigma_noise = jnp.sqrt(jnp.clip(jnp.diag(K), a_min=0.0)) * jax.random.normal( rng_key, X_test.shape[:1] ) mean = jnp.matmul(k_pX, jnp.matmul(K_xx_inv, Y)) # we return both the mean function and a sample from the posterior predictive for the # given set of hyperparameters return mean, mean + sigma_noise # do prediction vmap_args = ( random.split(rng_key_predict, samples["kernel_var"].shape[0]), samples["kernel_var"], samples["kernel_length"], samples["kernel_noise"], ) means, predictions = vmap( lambda rng_key, var, length, noise: predict( rng_key, X, Y, X_test, var, length, noise ) )(*vmap_args) mean_prediction = np.mean(means, axis=0) percentiles = np.percentile(predictions, [5.0, 95.0], axis=0) # make plots fig, ax = plt.subplots(figsize=(8, 6), constrained_layout=True) # plot training data ax.plot(X.ravel(), Y, "kx") # plot 90% confidence level of predictions ax.fill_between(X_test.ravel(), percentiles[0, :], percentiles[1, :], color="lightblue") # plot mean prediction ax.plot(X_test.ravel(), mean_prediction, "blue", ls="solid", lw=2.0) ax.set(xlabel="X", ylabel="Y", title="Mean predictions with 90% CI")
以上
以上