Upload files to "/"
Added the Data, fitter, and statistiline_filter scripts that are necessary working with the model.
This commit is contained in:
parent
0e504c0176
commit
f887c94a70
25
Data.py
Normal file
25
Data.py
Normal file
@ -0,0 +1,25 @@
|
||||
import numpy as np
|
||||
import h5py
|
||||
from scipy.interpolate import interp1d
|
||||
|
||||
|
||||
class Data:
|
||||
def __init__(self, filename, group_key):
|
||||
self.filename = filename
|
||||
self.datasets = []
|
||||
|
||||
with h5py.File(self.filename, "r") as h5:
|
||||
|
||||
if 1:
|
||||
print(f"Atribuudid grupis '{group_key}'")
|
||||
grp = h5[group_key]
|
||||
|
||||
self.current = grp["current_raw"][()] / grp.attrs["c_mem"]
|
||||
self.current_t = grp["current_t"][()]
|
||||
self.ECal = grp.attrs["vrev"]
|
||||
self.gGaL = grp.attrs["gmax"]
|
||||
self.eid = group_key
|
||||
|
||||
def get_current_slice(self, times: np.ndarray):
|
||||
interpolator = interp1d(self.current_t, self.current, fill_value="extrapolate")
|
||||
return interpolator(times)
|
204
fitter.py
Normal file
204
fitter.py
Normal file
@ -0,0 +1,204 @@
|
||||
import time
|
||||
import numpy as np
|
||||
import pylab as plt
|
||||
import pandas as pd
|
||||
import re
|
||||
|
||||
|
||||
from scipy.optimize import least_squares
|
||||
from model import Model
|
||||
from Data import Data
|
||||
|
||||
|
||||
class Fitter:
|
||||
def __init__(self, model: Model, data: Data, current_fit_range: tuple = (107, 341)) -> None:
|
||||
"""
|
||||
current_fit_range : tuple (t0, t1), t0 nad t1 are start and stop times in between current is fitted
|
||||
"""
|
||||
self.model = model
|
||||
self.data = data
|
||||
self.current_fit_range = current_fit_range
|
||||
self.fit_results = {}
|
||||
|
||||
self.tspan = [0, 1000]
|
||||
self.dt = 1 # 1.0
|
||||
self.time_points = np.arange(*self.tspan, self.dt)
|
||||
|
||||
self.iteration = 0 # least squares iteration counter
|
||||
|
||||
t0, t1 = self.current_fit_range = current_fit_range
|
||||
self.current_time_indecies = (t0 <= self.time_points) & (self.time_points <= t1)
|
||||
self.measured_current = self.data.get_current_slice(
|
||||
self.time_points[self.current_time_indecies] / 1000
|
||||
) # calculating in ms but data recorded in sec
|
||||
|
||||
def convolve_current(self, current: np.ndarray, tau=1.5):
|
||||
if np.abs(tau) < 1e-8:
|
||||
return current
|
||||
|
||||
k = np.zeros(current.size)
|
||||
k[k.size // 2 :] = np.exp(-np.arange(k.size // 2) / np.abs(tau))
|
||||
k /= k.sum()
|
||||
|
||||
if tau > 0:
|
||||
return np.convolve(current, k, mode="same")
|
||||
else:
|
||||
return np.convolve(current, k[::-1], mode="same")
|
||||
|
||||
def cost_func(self, parameters: np.ndarray):
|
||||
|
||||
model = self.model()
|
||||
|
||||
gGaL, ECal, K_pc_half, tau_xfer, tau_RC, offset = parameters
|
||||
|
||||
model.ECaL = ECal
|
||||
model.gCaL = gGaL
|
||||
model.K_pc_half = K_pc_half
|
||||
model.tau_xfer = tau_xfer
|
||||
|
||||
model.solve(times=self.time_points)
|
||||
|
||||
_calc_curr = model.calculated_current()
|
||||
calculated_current = self.convolve_current(_calc_curr, tau=tau_RC)[self.current_time_indecies] + offset
|
||||
|
||||
res = self.measured_current - calculated_current
|
||||
err = np.mean(res**2) # mean squared error
|
||||
self.iteration += 1
|
||||
print(self.iteration, parameters.tolist(), "err", err)
|
||||
|
||||
# measured_fluo = self.data.fluo
|
||||
# fluo_interplolator = interp1d(self.time_domain, model.calculated_fluo)
|
||||
# calculated_fluo = fluo_interplolator(self.data.fluo_time)
|
||||
|
||||
if self.iteration < -0:
|
||||
t = self.time_points[self.current_time_indecies]
|
||||
plt.plot(t, _calc_curr[self.current_time_indecies], label="calculated current")
|
||||
plt.plot(t, self.measured_current, label="measured current")
|
||||
plt.plot(t, calculated_current, label="conv calculated current")
|
||||
plt.plot(t, self.measured_current - calculated_current, label="error")
|
||||
plt.xlabel("time, ms")
|
||||
plt.ylabel("current, pA/pF")
|
||||
plt.legend(frameon=False)
|
||||
plt.show()
|
||||
# exit()
|
||||
|
||||
return res # , measured_fluo - calculated_fluo)
|
||||
|
||||
def optimize(self, init_parameters=None):
|
||||
t0 = time.time()
|
||||
self.iteration = 0
|
||||
if init_parameters is None:
|
||||
m = self.model()
|
||||
K_pc_half = m.K_pc_half
|
||||
tau_xfer = m.tau_xfer
|
||||
tau_RC = 1.5
|
||||
offset = 0
|
||||
|
||||
d = self.data
|
||||
init_parameters = np.array([d.gGaL, d.ECal, K_pc_half, tau_xfer, tau_RC, offset])
|
||||
print(init_parameters.tolist())
|
||||
|
||||
bounds = (
|
||||
(0.01, 10, 0.1, 0.1, 0.1, -5, -10),
|
||||
(10, 100, 100, 1, 100, 10, 10),
|
||||
)
|
||||
|
||||
res = least_squares(self.cost_func, init_parameters, bounds=bounds, xtol=1e-10)
|
||||
print()
|
||||
print(" Parameters: [gGaL, ECal, K_pc_half, tau_xfer, tau_RC, offset]")
|
||||
print(" Initial:", init_parameters.tolist())
|
||||
print(" Optimized:", res.x.tolist())
|
||||
print(" Optim status:", res.status)
|
||||
print("Optim message:", res.message)
|
||||
|
||||
gGaL, ECal, K_pc_half, tau_xfer, tau_RC, offset = res.x
|
||||
|
||||
self.fit_results.update({
|
||||
'gGaL': gGaL,
|
||||
'ECal': ECal,
|
||||
'K_pc_half': K_pc_half,
|
||||
'tau_xfer': tau_xfer,
|
||||
'tau_RC': tau_RC,
|
||||
'offset': offset,
|
||||
'mean_squared_error': err })
|
||||
|
||||
|
||||
model = self.model()
|
||||
|
||||
model.ECaL = ECal
|
||||
model.gCaL = gGaL
|
||||
model.K_pc_half = K_pc_half
|
||||
model.tau_xfer = tau_xfer
|
||||
|
||||
model.solve(times=self.time_points)
|
||||
|
||||
_calc_curr = model.calculated_current()
|
||||
calculated_current = self.convolve_current(_calc_curr, tau=tau_RC) + offset
|
||||
print("Elapsed time:", time.time() - t0)
|
||||
|
||||
fig = plt.figure(figsize=(24, 12))
|
||||
ax1 = fig.add_subplot(121)
|
||||
ax2 = fig.add_subplot(122)
|
||||
ax1.plot(1000 * self.data.current_t, self.data.current, label="Measured")
|
||||
ax1.plot(self.time_points, calculated_current, label="Calculated")
|
||||
ax1.set_xlabel("time, ms")
|
||||
ax1.set_ylabel("current, pA/pF")
|
||||
ax1.legend(frameon=False)
|
||||
|
||||
tp = self.time_points[self.current_time_indecies]
|
||||
ax2.plot(tp, self.measured_current, label="Measured")
|
||||
ax2.plot(tp, calculated_current[self.current_time_indecies], label="Calculated")
|
||||
ax2.set_xlabel("time, ms")
|
||||
ax2.set_ylabel("current, pA/pF")
|
||||
ax2.legend(frameon=False)
|
||||
return res, fig
|
||||
|
||||
def covcor_from_lsq(res):
|
||||
|
||||
_, s, VT = svd(res.jac, full_matrices=False)
|
||||
threshold = np.finfo(float).eps * max(res.jac.shape) * s[0]
|
||||
s = s[s > threshold]
|
||||
VT = VT[: s.size]
|
||||
cov = np.dot(VT.T / s**2, VT)
|
||||
|
||||
std = np.sqrt(np.diag(cov))
|
||||
cor = cov / np.outer(std, std)
|
||||
cor[cov == 0] = 0
|
||||
|
||||
return cov, cor
|
||||
|
||||
def plot_correlation_matrix(cor):
|
||||
plt.imshow(cor, cmap='viridis', interpolation='nearest')
|
||||
plt.colorbar(label='Correlation')
|
||||
plt.title('Correlation Matrix')
|
||||
plt.xlabel('Variables')
|
||||
plt.ylabel('Variables')
|
||||
plt.show()
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
filename = "ltcc_current.h5"
|
||||
eid = "0033635a51b096dc449eb9964e70443a67fc16b9587ae3ff6564eea1fa0e3437_2018.06.18 14:48:40"
|
||||
|
||||
data = Data(filename, eid)
|
||||
|
||||
fit = Fitter(Model, data)
|
||||
|
||||
fit_hist = pd.DataFrame.from_dict(fit.fit_results, orient='index').T
|
||||
fit_hist.index.name = 'Iterations'
|
||||
|
||||
res_filename = f"fit_results_{eid}.csv"
|
||||
res_filename = res_filename.replace(" ", "_").replace(":", "-")
|
||||
fit_hist.to_csv(res_filename, index=True)
|
||||
|
||||
|
||||
eid_cleaned = re.sub(r'[^\w.-]', '', eid) # Eemalda kõik eritähed ja jääb alles alphanumbrilised tähed, sidekriipsud ja punktid
|
||||
fig.savefig(f"plot_{eid_cleaned}.png")
|
||||
fig.savefig(f"plot_{eid_cleaned}.pdf")
|
||||
|
||||
# plot_filename = "fit_plot"
|
||||
# fig.savefig(f"{plot_filename}.png")
|
||||
# fig.savefig(f"{plot_filename}.pdf")
|
||||
fig.savefig("naidis_fit.pdf")
|
||||
|
||||
plt.show()
|
192
statistiline_filter.py
Normal file
192
statistiline_filter.py
Normal file
@ -0,0 +1,192 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#%% import
|
||||
|
||||
import h5py
|
||||
import os
|
||||
import pandas as pd
|
||||
import statsmodels.api as sm
|
||||
|
||||
from statsmodels.formula.api import ols
|
||||
from scipy.stats import f
|
||||
from scipy.stats import shapiro
|
||||
|
||||
h5_file = 'ltcc_current.h5'
|
||||
|
||||
sobiv_eid_list = []
|
||||
ttx_eid_list = []
|
||||
teised_eid_list = []
|
||||
|
||||
with h5py.File(h5_file, 'r') as h5_file:
|
||||
|
||||
for eid in h5_file.keys():
|
||||
if 'tag' in h5_file[eid].attrs:
|
||||
tag_val = h5_file[eid].attrs['tag']
|
||||
|
||||
# pean tõdema, et siin aitas chatgpt oma soovitusega :/ vaga halb debug oli
|
||||
if isinstance(tag_val, bytes):
|
||||
tag_val = tag_val.decode('utf-8')
|
||||
|
||||
puhas_eid = eid.replace(" ", "_").replace(":", "-")
|
||||
fit_result_eid = "fit_results_" + puhas_eid
|
||||
|
||||
if tag_val == 'iso':
|
||||
sobiv_eid_list.append(fit_result_eid)
|
||||
elif tag_val == 'ttx':
|
||||
ttx_eid_list.append(fit_result_eid)
|
||||
else:
|
||||
teised_eid_list.append(fit_result_eid)
|
||||
#for attr_key, attr_value in h5_file[eid].attrs.items():
|
||||
#print(f"Atribute: {attr_key}, Value: {attr_value}")
|
||||
|
||||
#%%
|
||||
|
||||
file = 'ltcc_current.h5'
|
||||
|
||||
with h5py.File(file, 'r') as h5_file:
|
||||
for eid in h5_file.keys():
|
||||
puhastatud_eid = eid.replace(" ", "_").replace(":", "-")
|
||||
|
||||
atribuudid = h5_file[eid].attrs
|
||||
sex = atribuudid.get('sex')
|
||||
spid = atribuudid.get('spid')
|
||||
|
||||
csv_file_name = f"fit_results_{puhastatud_eid}.csv"
|
||||
|
||||
if os.path.exists(csv_file_name):
|
||||
df = pd.read_csv(csv_file_name)
|
||||
|
||||
df['sex'] = sex
|
||||
df['spid'] = spid.replace("Mouse AGAT","")
|
||||
df['eid'] = eid
|
||||
|
||||
df.to_csv(csv_file_name, index=False)
|
||||
|
||||
for fail in os.listdir():
|
||||
if fail.endswith('.csv'):
|
||||
|
||||
eksperiment_id = fail.replace(".csv", "")
|
||||
|
||||
if eksperiment_id in sobiv_eid_list:
|
||||
|
||||
df = pd.read_csv(fail)
|
||||
df['tag'] = 'iso'
|
||||
df.to_csv(fail, index=False)
|
||||
|
||||
elif eksperiment_id in ttx_eid_list:
|
||||
|
||||
df = pd.read_csv(fail)
|
||||
df['tag'] = 'ttx'
|
||||
df.to_csv(fail, index=False)
|
||||
|
||||
else:
|
||||
|
||||
df = pd.read_csv(fail)
|
||||
df['tag'] = 'teised'
|
||||
df.to_csv(fail, index=False)
|
||||
|
||||
#%%
|
||||
|
||||
comb_df = pd.DataFrame()
|
||||
|
||||
for filename in os.listdir():
|
||||
|
||||
if filename.endswith('.csv'):
|
||||
df = pd.read_csv(filename)
|
||||
|
||||
if 'tag' in df.columns and df['tag'].isin(['iso', 'ttx']).all():
|
||||
|
||||
comb_df = pd.concat([comb_df, df], ignore_index=True)
|
||||
|
||||
print(comb_df)
|
||||
|
||||
#%% lugemine, et teha kindlaks mis tüüpi ANOVA teha kasutades statsmodels packetit,
|
||||
|
||||
sex_counts = comb_df['sex'].value_counts()
|
||||
|
||||
spid_counts = comb_df['spid'].value_counts()
|
||||
|
||||
tag_counts = comb_df['tag'].value_counts()
|
||||
|
||||
|
||||
print(sex_counts,spid_counts,tag_counts)
|
||||
|
||||
#%% normaalsuse kontroll
|
||||
tau_xfer = comb_df['tau_xfer']
|
||||
|
||||
stat, p = shapiro(tau_xfer)
|
||||
|
||||
alpha = 0.05
|
||||
if p > alpha:
|
||||
print("Andmed on normaalselt jaotunud (ei lükka tagasi nullhüpoteesi)")
|
||||
else:
|
||||
print("Andmed ei ole normaalselt jaotunud (lükata tagasi nullhüpotees)")
|
||||
|
||||
|
||||
|
||||
#%% ANOVA 2 WAY
|
||||
|
||||
comb_df['spid'] = comb_df['spid'].astype('category')
|
||||
comb_df['sex'] = comb_df['sex'].astype('category')
|
||||
comb_df['tag'] = comb_df['tag'].astype('category')
|
||||
|
||||
model = ols('tau_xfer ~ C(sex) + C(spid)+ C(tag)', data=comb_df).fit()
|
||||
|
||||
anova_table = sm.stats.anova_lm(model, typ=2)
|
||||
print(anova_table)
|
||||
|
||||
#%% kriitiline vaartus
|
||||
|
||||
df_between_groups = 2 # Vabadusastmed gruppide vahel
|
||||
df_within_groups = 67 # Vabadusastmed rühmades (Residual)
|
||||
|
||||
alpha = 0.05
|
||||
|
||||
critical_f = f.ppf(1 - alpha, df_between_groups, df_within_groups)
|
||||
|
||||
print("Kriitiline F-väärtus:", critical_f)
|
||||
|
||||
|
||||
#%% kriitiline vaartus
|
||||
|
||||
|
||||
df_between_groups = 2 # Vabadusastmed gruppide vahel (sex ja spid)
|
||||
df_within_groups = 36 # Vabadusastmed rühmades (Residual)
|
||||
|
||||
alpha = 0.05
|
||||
|
||||
critical_f = f.ppf(1 - alpha, df_between_groups, df_within_groups)
|
||||
|
||||
print("Kriitiline F-väärtus:", critical_f)
|
||||
|
||||
"""
|
||||
#%%grupeeringud
|
||||
|
||||
groups = comb_df.groupby('tag')
|
||||
|
||||
iso_group = groups.get_group('iso')
|
||||
ttx_group = groups.get_group('ttx')
|
||||
|
||||
print("ISO grupp:", iso_group)
|
||||
print("nTTX grupp:", ttx_group)
|
||||
|
||||
#%% ANOVA jaoks on grupid piisavalt suured,
|
||||
#allikas https://support.minitab.com/en-us/minitab/help-and-how-to/statistical-modeling/anova/how-to/one-way-anova/before-you-start/data-considerations/
|
||||
|
||||
f_statistic, p_value = f_oneway(iso_group['tau_xfer'], ttx_group['tau_xfer'])
|
||||
|
||||
#%% f_crit control
|
||||
|
||||
dfn = 2-1 #2 gruppi, miinus 1, hetkel ei lahendanud seda vaga automatiseeritult
|
||||
dfd = len(iso_group) + len(ttx_group) - 2 #ka ei lahendanud automatiseeritult
|
||||
|
||||
alpha = 0.05
|
||||
|
||||
f_crit = f.ppf(1 - alpha, dfn, dfd)
|
||||
|
||||
#%% Väljastame tulemused
|
||||
|
||||
print("F-statistika:", f_statistic)
|
||||
print("P-väärtus:", p_value)
|
||||
print("F-kriitiline:", f_crit)
|
||||
"""
|
Loading…
Reference in New Issue
Block a user