下面对目前所有相位恢复的代码进行整理,详细说明每段代码的原理和功能。
使用 PhaseRetrival 工具包进行相位恢复 验证传播函数正确性 下面是使用 PhaseRetrival 工具包进行相位恢复前的准备工作,使用FECO导出的数据(幅值+相位)模拟衍射,以验证衍射部分代码的正确性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 addpath('E:\组会\其他\实验室历史文件\code\phasepack-matlab-master\solvers' ); addpath('E:\组会\其他\实验室历史文件\code\phasepack-matlab-master\util' ); addpath('E:\组会\其他\实验室历史文件\code\phasepack-matlab-master\initializers' ); global numrows numcols num_fourier_masks masks padding lamda dx;f = 1.65e9 ; lamda = physconst('lightspeed' ) / f; k0 = 2 * pi / lamda; dtheta = pi / 180 ; dphi = pi / 180 ; theta = -pi / 2 :dtheta:pi / 2 ; phi = 0 :dphi:pi ; NF_data1 = 'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\DATAyyq\l_6lamb_n_97_2-5lamb.efe' ; NF_data2 = 'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\DATAyyq\l_6lamb_n_97_3-0lamb.efe' ; NF_data3 = 'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\DATAyyq\l_6lamb_n_97_3-5lamb.efe' ; [Ex1, Ey1, xd1, yd1] = load_NF(NF_data1); [Ex2, Ey2, xd2, yd2] = load_NF(NF_data2); [Ex3, Ey3, xd3, yd3] = load_NF(NF_data3); Ey1_1D = Ey1(:); Ey2_1D = Ey2(:); Ey3_1D = Ey3(:); M = length (xd2); N = length (yd2); dx = (xd2(2 ) - xd2(1 )); dy = (yd2(2 ) - yd2(1 ));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 padding = 2 ; MI = padding * M; NI = padding * N; m = -MI / 2 :1 :MI / 2 - 1 ; n = -NI / 2 :1 :NI / 2 - 1 ; k_X_Rectangular = 2 * pi * m / (MI * dx); k_Y_Rectangular = 2 * pi * n / (NI * dy); dk_X = k_X_Rectangular(2 ) - k_X_Rectangular(1 ); dk_Y = k_Y_Rectangular(2 ) - k_Y_Rectangular(1 ); [k_Y_Rectangular_Grid, k_X_Rectangular_Grid] = meshgrid (k_Y_Rectangular, k_X_Rectangular); fy1 = fftshift(ifft2(Ey2, MI, NI) * MI * NI) * dx * dy; fy_Magnitude = 20 * log10 (abs (fy1));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 kz = sqrt (k0 ^ 2 - k_X_Rectangular_Grid .^ 2 - k_Y_Rectangular_Grid .^ 2 ); W = 0.55 ; H = 0.428 ; w = dx * (M - 1 ); h = dy * (N - 1 ); z0 = 2.5 * lamda; z1 = 3.0 * lamda; z2 = 3.5 * lamda; Reliable_rate = 1 ; thetax_sure = Reliable_rate * atan (0.5 * (w - W) / z1); thetay_sure = Reliable_rate * atan (0.5 * (h - H) / z1); UR = zeros (MI, NI); for i = 1 :MI for j = 1 :NI if ((k_X_Rectangular(i ) * k_X_Rectangular(i ) / ((k0 * sin (thetax_sure)) ^ 2 )) + (k_Y_Rectangular(j ) * k_Y_Rectangular(j ) / (k0 ^ 2 )) < 1.0 && (k_X_Rectangular(i ) * k_X_Rectangular(i ) / (k0 ^ 2 )) + (k_Y_Rectangular(j ) * k_Y_Rectangular(j ) / ((k0 * sin (thetay_sure)) ^ 2 )) < 1.0 ) UR(i , j ) = 1 ; else UR(i , j ) = 0 ; end end end fy_H_Magnitude = 20 * log10 (abs (fy1 .* UR));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 [numrows, numcols] = size (k_Y_Rectangular_Grid); num_fourier_masks = 2 ; masks = zeros (numrows, numcols, num_fourier_masks); dz = [0 0.5 ]; isim = imag (kz) ~= 0 ; kz(isim) = -1 * kz(isim); for i = 1 :num_fourier_masks masks(:, :, i ) = UR .* exp (-1 i * kz * dz(i ) * lamda); end fy0 = fftshift(ifft2(Ey2, MI, NI) * MI * NI) * dx * dy; x = fy0(:); b_cal = abs (A(x)); b_real = [abs (Ey2_1D); abs (Ey3_1D)];
1 2 3 4 5 6 7 b_err_1 = sqrt (sum(abs (abs (b_real) - b_cal) .^ 2 , 'all' ) / sum(b_cal .^ 2 , 'all' )); b_err_2 = abs ((b_real - b_cal) ./ b_cal); b_err = reshape (b_err_2, [numrows / padding, numcols / padding, num_fourier_masks]); E1_err = (squeeze (b_err(:, :, 1 ))); E2_err = (squeeze (b_err(:, :, 2 )));
PhaseRetrival 工具包进行相位恢复 上面已经验证了衍射过程的正确,下面开始使用 PhaseRetrival 工具包进行相位恢复,这个过程是已经集成好的,只需要设置少量参数即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 opts = struct; opts.algorithm = 'RAF' ; opts.initMethod = 'Weighted' ; opts.tol = 1e-6 ; opts.verbose = 1 ; opts.maxIters = 1000 ; fprintf('Running.....\n 算法: %s \n 初始化: %s \n 误差: %s \n 迭代次数: %d \n' , opts.algorithm, opts.initMethod, opts.tol, opts.maxIters); [x, outs, opts] = solvePhaseRetrieval(@A, @At, b_real, numel (k_Y_Rectangular_Grid), opts); recovered_image = reshape (x, numrows, numcols); fprintf('Image recovery required %d iterations (%f secs)\n' , outs.iterationCount, outs.solveTimes(end )); by_retrival = A(recovered_image); by_retrival = reshape (by_retrival, [numrows / padding, numcols / padding, num_fourier_masks]); E2 = (squeeze (by_retrival(:, :, 1 )));
对比远场数据 为验证 PhaseRetrival 工具包的计算结果,使用其计算出的远场方向图与FECO导出的远场进行对比。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 [EPLANE, HPLANE, Etot] = computerFF(theta, phi, Ex2, E2, k0, M, N, dx); [EPLANE2, HPLANE2, Etot2] = computerFF(theta, phi, Ex2, Ey2, k0, M, N, dx); Etot_Size = size (Etot); ffH_err = 20 * log10 (abs ((Etot(1 , :) - Etot2(1 , :)) / max (Etot2(1 , :)))); ffE_err = 20 * log10 (abs ((Etot(floor (Etot_Size(2 ) / 2 ), :)) - Etot2(floor (Etot_Size(2 ) / 2 ), :)) ./ max (Etot2(floor (Etot_Size(2 ) / 2 ), :))); ENL_H = 20 * log10 (mean (abs ((Etot(1 , :) - Etot2(1 , :)) / max (Etot2(1 , :))))); ENL_E = 20 * log10 (mean (abs ((Etot(floor (Etot_Size(2 ) / 2 ), :)) - Etot2(floor (Etot_Size(2 ) / 2 ), :)) ./ max (Etot2(floor (Etot_Size(2 ) / 2 ), :)))); ENL = 20 * log10 (mean (abs ((Etot - Etot2) / max (max (Etot2))), "all" )); far_data = importdata("E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\ffdata.dat" ); xn1 = far_data.data(:, 1 ); yn1 = far_data.data(:, 2 ); yn2 = far_data.data(:, 3 );
辅助函数 下面是 PhaseRetrival 工具包在计算两个面衍射的时候,使用的辅助函数A 和 A的逆函数At。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 function measurements = A (pixels) global numrows numcols num_fourier_masks masks padding dx; dk_X = 2 * pi / (numrows * dx); im = reshape (pixels, [numrows, numcols]); measurements = zeros (numrows / padding, numcols / padding, num_fourier_masks); for m = 1 :num_fourier_masks this_mask = squeeze (masks(:, :, m)); measurements_Padded = (fft2(ifftshift(im .* this_mask))) * dk_X * dk_X / (4 * pi * pi ); measurements(:, :, m) = measurements_Padded(1 :numrows / padding, 1 :numcols / padding); end measurements = measurements(:); end function pixels = At (measurements) global numrows numcols num_fourier_masks masks padding dx; dk_X = 2 * pi / (numrows * dx); im_reconstructed = zeros (numrows, numcols); measurements = reshape (measurements, [numrows / padding, numcols / padding, num_fourier_masks]); for m = 1 :num_fourier_masks this_mask = squeeze (masks(:, :, m)); this_measurements = squeeze (measurements(:, :, m)); im_reconstructed = im_reconstructed + conj (this_mask) .* fftshift(ifft2(this_measurements, numrows, numcols)) * numrows * numcols * dk_X * dk_X / (4 * pi * pi ); end pixels = im_reconstructed(:); end
使用U-Net 网络直接进行相位恢复 下面是最初直接使用tensorflow 搭建 U-Net 网络进行相位恢复的代码
数据读取 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class PlaneData : def __init__ (self, file_name ): abs_temp = [] E_temp = [] phase_temp = [] f = open (file_name) E = f.read() if file_name.split('.' )[-1 ] == 'txt' : E = E.split('\n' )[2 :-1 ] elif file_name.split('.' )[-1 ] == 'efe' : E = E.split('\n' )[16 :-2 ] x = int (np.sqrt(len (E))) y = int (np.sqrt(len (E))) index_dot = file_name.find('-' ) distance_plane = int (file_name[index_dot - 1 ]) + float (file_name[index_dot + 1 ]) * 0.1 main_direction = 'y' for k in E: E_xp, E_yp, E_zp, E_xr, E_xi, E_yr, E_yi, E_zr, E_zi = k.split() E_x = complex (float (E_xr), float (E_xi)) E_y = complex (float (E_yr), float (E_yi)) E_z = complex (float (E_zr), float (E_zi)) if main_direction == 'y' : abs_temp.append(np.abs (E_y)) E_temp.append(E_y) phase = np.angle(E_y) phase_temp.append(phase) elif main_direction == 'x' : abs_temp.append(np.abs (E_x)) E_temp.append(E_x) phase = np.angle(E_x) phase_temp.append(phase) elif main_direction == 'z' : abs_temp.append(np.abs (E_z)) E_temp.append(E_z) phase = np.angle(E_z) phase_temp.append(phase) abs_pro = np.array(abs_temp).reshape((x, y)) E_temp = np.array(E_temp).reshape((x, y)) phase_pro = np.array(phase_temp).reshape((x, y)) self.abs = abs_pro[0 :96 , 0 :96 ] / np.max (abs_pro[0 :96 , 0 :96 ]) self.phase = phase_pro[0 :96 , 0 :96 ] self.E = E_temp[0 :96 , 0 :96 ] / np.max (np.abs (E_temp[0 :96 , 0 :96 ])) self.N = x - 1 self.distance = distance_plane plane_1 = PlaneData(r'DATA\l_6lamb_n_97_2-5lamb.efe' ) plane_2 = PlaneData(r'DATA\l_6lamb_n_97_3-0lamb.efe' ) plane_3 = PlaneData(r'DATA\l_6lamb_n_97_3-2lamb.efe' )
定义变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 dim = plane_1.N jqx = 0 jqy = 0 plane_W = dim plane_H = dim batch_size = 1 lamb = 299792458 / 1.65e9 pixelsize = lamb / 8 N = dim L = N * pixelsize Steps = 5000 LR = 0.01 noise_level = 0
定义网络结构 1 2 3 4 5 6 7 8 9 10 11 with tf.compat.v1.variable_scope('input' ): rand = tf.placeholder(shape=(dim, dim), dtype=tf.float32, name='noise' ) true_measure = tf.constant(plane_2.abs , dtype=tf.float32, name='diffraction' ) inpt = true_measure + rand with tf.compat.v1.variable_scope('inference' ): out = model_Unet.inference(inpt, plane_H, plane_W, batch_size) out = tf.reshape(out, [plane_H, plane_W])
引入物理衍射 1 2 3 out_calculate = Calculate_step.Phy(out, lamb, L, (plane_2.distance - plane_1.distance) * lamb, plane_1.abs , jqx, jqy, plane_H, plane_W)
上面 Calculate_step.Phy 函数用来模拟物理传播过程,与之前使用 PhaseRetrival 工具包进行相位恢复时的物理传播过程保持一致。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 def Phy (fai_1, lamb, L, Z, E_1_abs, jqx, jqy, plane_H, plane_W ): M = int (fai_1.shape[1 ]) E_abs_1 = E_1_abs E_abs_1 = E_abs_1[jqx:jqx + plane_H, jqy:jqy + plane_W] E_abs_1 = E_abs_1 / np.max (E_abs_1) fai_1 = tf.cast(fai_1, dtype=tf.complex128) fai_1 = 1j * fai_1 U_in0 = tf.exp(fai_1) U_in = tf.multiply(U_in0, E_abs_1) U_in_fft = Myfftshift.fftshift(tf.signal.fft2d(U_in)) U_in_fft = tf.cast(U_in_fft, dtype=tf.complex128) fx = 1 / L x = np.linspace(-M / 2 , M / 2 - 1 , M) fx = (fx * x) [Fx, Fy] = np.meshgrid(fx, fx) k = 2 * math.pi / lamb Ax = 548 Ay = 426 Lx = L * 1000 Ly = L * 1000 Reliable_rate = 1 thea_x = Reliable_rate * np.arctan(((Lx - Ax) / 2 / 2.5 / lamb / 1000 )) thea_y = Reliable_rate * np.arctan(((Ly - Ay) / 2 / 2.5 / lamb / 1000 )) G1 = (1 - lamb * lamb * (Fx * Fx / np.sin(thea_x) / np.sin(thea_x) + Fy * Fy)) G2 = (1 - lamb * lamb * (Fx * Fx + Fy * Fy / np.sin(thea_y) / np.sin(thea_y))) G = (1 - lamb * lamb * (Fx * Fx + Fy * Fy)) H = np.ones_like(G) * 1j for i in range (H.shape[0 ]): for j in range (H.shape[1 ]): if G[i, j] >= 0 : temp = k * (-1 * Z) * np.sqrt(G[i, j]) H[i, j] = np.exp(1j * temp) else : temp = k * (1j * Z) * np.sqrt(1j * G[i, j] * 1j ) H[i, j] = np.exp(-1 * temp) H = tf.cast(H, dtype=tf.complex128) U_out_fft = tf.multiply(U_in_fft, H) U_out = tf.signal.ifft2d(Myfftshift.ifftshift(U_out_fft)) U_out_abs = tf.abs (U_out) temp = tf.reduce_max(tf.reduce_max(U_out_abs)) U_out = tf.abs (U_out) / temp return U_out
神经网络补充 1 2 3 4 5 6 loss_m = tf.compat.v1.losses.mean_squared_error(out_calculate, true_measure) optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=LR) update_ops = tf.compat.v1.get_collection(tf.compat.v1.GraphKeys.UPDATE_OPS)
神经网络初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 with tf.control_dependencies(update_ops): train_op = optimizer.minimize(loss_m) saver = tf.compat.v1.train.Saver() with tf.compat.v1.Session() as sess: sess.run(tf.global_variables_initializer()) m_loss = np.zeros([Steps, 1 ]) temp_m_loss = []
网络迭代优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 for step in range (Steps): new_rand = np.random.uniform(0 , noise_level, size=(dim, dim)) sess.run([train_op], feed_dict={rand: new_rand}) loss_measure = sess.run(loss_m, feed_dict={rand: new_rand}) m_loss[step] = loss_measure print ('Step:%d Measure loss:%f' % (step, loss_measure)) new_rand = np.random.uniform(0 , 0.0 , size=(dim, dim)) pha_out = sess.run(out, feed_dict={rand: new_rand}).reshape(dim, dim) out_calculate_temp = sess.run(out_calculate, feed_dict={rand: new_rand}).reshape(dim, dim)
使用U-Net 网络源重构进行相位恢复 下面是通过源重构进行相位恢复,把上面物理衍射的物理过程改为格林公式,模拟等效源在空间中的辐射过程。
模型差异 其机器学习相关的大量代码——网络结构、初始化、迭代过程——与上一个保持一致,只是物理过程部分的代码不同,下面只展示不同部分。
1 2 3 4 5 6 7 8 9 sample1_cords = Calculate_step.set_cords(-1 * int (N / 2 ) * dx, int (N / 2 ) * dy, dx, dy, plane_RS.distance * lamda) source_cords = Calculate_step.set_cords(-1 * int (N / 2 ) * dx, int (N / 2 ) * dy, dx, dy, plane_RS.distance * lamda * 0.8 ) g = Calculate_step.green_function(source_cords, sample1_cords, k, dx, dy) out_calculate = tf.abs (g @ out) out_calculate = tf.reshape(out_calculate, [dim, dim])
1 2 3 4 5 6 7 8 9 10 11 12 13 def set_cords (begin, end, dx, dy, Z ): x_d = np.arange(begin, end, dx) y_d = np.arange(begin, end, dy) [x_grid, y_grid] = np.meshgrid(x_d, y_d) z_grid = Z * np.ones_like(x_grid) x_cords = x_grid.reshape(-1 , 1 ) y_cords = y_grid.reshape(-1 , 1 ) z_cords = z_grid.reshape(-1 , 1 ) return_cords = np.hstack((y_cords, x_cords, z_cords)) return return_cords
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def green_function (source_xy, sample_xy, k, dx, dy ): x_source = source_xy[:, 0 ] y_source = source_xy[:, 1 ] z_source = source_xy[:, 2 ] x_sample = sample_xy[:, 0 ] y_sample = sample_xy[:, 1 ] z_sample = sample_xy[:, 2 ] g = np.zeros((len (x_sample), len (x_source)), dtype=complex ) for m in range (len (x_sample)): R_ij = np.sqrt((x_sample[m] - x_source) ** 2 + (y_sample[m] - y_source) ** 2 + (z_sample[m] - z_source) ** 2 ) g[m, :] = 1 / (4 * np.pi) * (np.exp(-1j * k * R_ij) / R_ij ** 2 ) * ((z_sample[m] - z_source) * (1j * k + 1 / R_ij)) * dx * dy return g
使用全连接神经网络进行相位恢复 论文复现 首先,下面是对 PhaseRetrival 工具包中相位恢复相关论文中提到的原理复现。这一部分并不涉及机器学习,是传统的相位恢复算法——使用加权最大相关初始化,并用加权梯度流进行迭代。
按照给定的形状随机生成实数信号(y、A、x)
1 2 3 4 5 6 7 8 9 def generate_data (n, m ): x = np.random.randn(n) x = x / np.linalg.norm(x) A = np.random.randn(m, n) y = np.abs (A @ x) return A, y, x
定义初始化方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def weighted_max_correlation_init (A, y, S_size ): m, n = A.shape S = np.argsort(y)[-S_size:] A_S = A[S, :] y_S = y[S] gamma = 0.5 weights = y_S ** gamma Y = (A_S.T * weights) @ A_S / S_size _, _, Vt = svds(Y, k=1 ) z0 = Vt[0 ] norm_estimate = np.sqrt(np.mean(y ** 2 )) z0 = z0 * norm_estimate return z0
定义优化方法——加权幅流法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def reweighted_amplitude_flow (A, y, z0, max_iter=3000 , mu=0.5 , beta=5 ): m, n = A.shape z = z0.copy() for t in range (max_iter): Az = A @ z Az_abs = np.abs (Az) ratios = Az_abs / y weights = 1 / (1 + beta / ratios) gradient = (A.T @ (weights * (Az - y * (Az / Az_abs)))) / m z = z - mu * gradient if np.linalg.norm(gradient) < 1e-6 : break return z
主函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def main (): n = 1000 m = 2 * n - 1 S_size = int (3 * m / 13 ) A, y, x = generate_data(n, m) z0 = weighted_max_correlation_init(A, y, S_size) z_hat = reweighted_amplitude_flow(A, y, z0) error = np.linalg.norm(z_hat - x) / np.linalg.norm(x) print (f"Recovery error: {error} " )
引入神经网络实现 下面把神经网络引入,根据之前同学提醒,打算使用端对端的神经网络,通过输入A与y,直接得到待求的x。
1 2 3 4 5 6 def generate_data (n, m ): x = np.random.randn(n) A = np.random.randn(m, n) y = np.abs (A @ x) return A, y, x
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class SignalRecoveryModel (nn.Module): def __init__ (self, m, n ): super (SignalRecoveryModel, self).__init__() self.input_size = m * n + m self.fc1 = nn.Linear(self.input_size, 512 ) self.fc2 = nn.Linear(512 , 256 ) self.fc3 = nn.Linear(256 , n) self.relu = nn.ReLU() def forward (self, A, y ): A_flat = A.view(-1 ) y_flat = y.view(-1 ) x_input = torch.cat((A_flat, y_flat), dim=0 ) x_input = x_input.unsqueeze(0 ) x = self.relu(self.fc1(x_input)) x = self.relu(self.fc2(x)) x = self.fc3(x) return x.squeeze(0 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 def main (): n = 97 * 2 m = 2 * n A, y, x = generate_data(n, m) A_tensor = torch.FloatTensor(A) y_tensor = torch.FloatTensor(y) x_tensor = torch.FloatTensor(x) model = SignalRecoveryModel(m, n) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001 ) num_epochs = 1000 for epoch in range (num_epochs): model.train() optimizer.zero_grad() outputs = model(A_tensor, y_tensor) loss = criterion(outputs, x_tensor) loss.backward() optimizer.step() if (epoch + 1 ) % 100 == 0 : print (f'Epoch [{epoch + 1 } /{num_epochs} ], Loss: {loss.item():.4 f} ' ) model.eval () with torch.no_grad(): z_hat = model(A_tensor, y_tensor).numpy() error = np.linalg.norm(z_hat - x) / np.linalg.norm(x) print (f"Recovery error: {error} " )
复数改进 上面的代码只针对实数的情况,但在相位恢复中的数据都是复数,目前的神经网络只支持实数,所以需要对其进行改进,以满足复数的需要,下面是改进后的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 def generate_data (n, m ): x = np.random.randn(n) + 1j * np.random.randn(n) x = x / np.linalg.norm(x) A = np.random.randn(m, n) + 1j * np.random.randn(m, n) y = np.abs (A @ x) return A, y, x
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class ComplexSignalRecoveryModel (nn.Module): def __init__ (self, m, n ): super (ComplexSignalRecoveryModel, self).__init__() self.input_size = 2 * m * n + m self.output_size = 2 * n self.fc1 = nn.Linear(self.input_size, 2048 ) self.fc2 = nn.Linear(2048 , 1024 ) self.fc3 = nn.Linear(1024 , self.output_size) self.relu = nn.ReLU() def forward (self, A_real_flat, A_imag_flat, y_flat ): x_input = torch.cat([A_real_flat, A_imag_flat, y_flat], dim=0 ) x_input = x_input.unsqueeze(0 ) x = self.relu(self.fc1(x_input)) x = self.relu(self.fc2(x)) x = self.fc3(x) x_out = x.squeeze(0 ) x_real = x_out[:self.output_size // 2 ] x_imag = x_out[self.output_size // 2 :] return x_real, x_imag
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 def main (): n = 25 m = 2 * n - 1 A, y, x = generate_data(n, m) x_real_true = np.real(x) x_imag_true = np.imag(x) A_real = np.real(A).flatten() A_imag = np.imag(A).flatten() y_flat = y.flatten() A_real_tensor = torch.FloatTensor(A_real) A_imag_tensor = torch.FloatTensor(A_imag) y_tensor = torch.FloatTensor(y_flat) x_real_tensor = torch.FloatTensor(x_real_true) x_imag_tensor = torch.FloatTensor(x_imag_true) model = ComplexSignalRecoveryModel(m, n) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001 ) num_epochs = 1000 for epoch in range (num_epochs): model.train() optimizer.zero_grad() x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor) loss_real = criterion(x_real_pred, x_real_tensor) loss_imag = criterion(x_imag_pred, x_imag_tensor) total_loss = loss_real + loss_imag total_loss.backward() optimizer.step() if (epoch + 1 ) % 100 == 0 : print (f'Epoch [{epoch + 1 } /{num_epochs} ], Loss: {total_loss.item():.4 f} ' ) model.eval () with torch.no_grad(): x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor) z_hat = x_real_pred.numpy() + 1j * x_imag_pred.numpy() error = np.linalg.norm(z_hat - x) / np.linalg.norm(x) print (f"Recovery error: {error} " )
验证传播函数的正确性 这一部分用来验证传播函数部分代码的正确性,共有两个部分。第一部分是与之前 PhaseRetrival 工具包一样,验证衍射过程的正确性;第二部分是因为在神经网络中需要测量矩阵A的显示表达,所以还需验证其正确性。
首先是衍射过程中函数A和A的逆函数At的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 def A_function (pixels, masks, padding, dx ): numrows, numcols = masks.shape[:2 ] im = pixels.reshape(numrows, numcols) measurements = np.zeros((numrows // padding, numcols // padding, masks.shape[2 ]), dtype=np.complex128) dk_X = 2 * np.pi / (numrows * dx) for m in range (masks.shape[2 ]): masked = im * masks[:, :, m] ft = np.fft.fft2(np.fft.ifftshift(masked)) measurements[:, :, m] = ft[:numrows // padding, :numcols // padding] * (dk_X ** 2 ) / (4 * np.pi ** 2 ) return measurements.ravel(order='F' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def At_function (measurements, numrows, numcols, num_fourier_masks, masks, padding, dx ): dk_X = 2 * np.pi / (numrows * dx) im_reconstructed = np.zeros((numrows, numcols), dtype=complex ) measurements = np.reshape(measurements, (numrows // padding, numcols // padding, num_fourier_masks), order='F' ) for m in range (num_fourier_masks): this_mask = np.squeeze(masks[:, :, m]) this_measurements = np.squeeze(measurements[:, :, m]) im_reconstructed += np.conj(this_mask) * np.fft.fftshift(np.fft.ifft2(this_measurements, s=(numrows, numcols))) * numrows * numcols * dk_X * dk_X / (4 * np.pi * np.pi) pixels = im_reconstructed.ravel(order='F' ) return pixels
下面是测量矩阵A的显示表达函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 def At_function (measurements, numrows, numcols, num_fourier_masks, masks, padding, dx ): dk_X = 2 * np.pi / (numrows * dx) im_reconstructed = np.zeros((numrows, numcols), dtype=complex ) measurements = np.reshape(measurements, (numrows // padding, numcols // padding, num_fourier_masks), order='F' ) for m in range (num_fourier_masks): this_mask = np.squeeze(masks[:, :, m]) this_measurements = np.squeeze(measurements[:, :, m]) im_reconstructed += np.conj(this_mask) * np.fft.fftshift(np.fft.ifft2(this_measurements, s=(numrows, numcols))) * numrows * numcols * dk_X * dk_X / (4 * np.pi * np.pi) pixels = im_reconstructed.ravel(order='F' ) return pixels
功能整合 整合上面提到的功能,在下面的代码中,可以完成三个主要的功能:
1 process = ['train' , 'test' , 'trained_test' ]
'train'
:使用随机生成的数据训练模型
'test'
:不使用神经网络的部分,仅仅验证传播函数的正确性
'trained_test'
:对训练好的模型进行测试,目前是使用重新随机生成的数据进行测试,接下来还会补充使用FECO导出的电场数据进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 if 'train' in process: device = torch.device('cuda' if (torch.cuda.is_available() and FLAG_GPU) else 'cpu' ) A, y, x = generate_data(n, m) file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\test_A.npy" np.save(file_path, A) print ("已创建并保存test_A.npy数据" ) model = ComplexSignalRecoveryModel(m, n).to(device) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001 ) num_epochs = 1000 for epoch in range (num_epochs): model.train() optimizer.zero_grad() A, y, x = generate_data(n, m, A) x_real_true = np.real(x) x_imag_true = np.imag(x) A_real = np.real(A).ravel(order='F' ) A_imag = np.imag(A).ravel(order='F' ) y_flat = y.flatten() A_real_tensor = torch.FloatTensor(A_real).to(device) A_imag_tensor = torch.FloatTensor(A_imag).to(device) y_tensor = torch.FloatTensor(y_flat).to(device) x_real_tensor = torch.FloatTensor(x_real_true).to(device) x_imag_tensor = torch.FloatTensor(x_imag_true).to(device) x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor) loss_real = criterion(x_real_pred, x_real_tensor) loss_imag = criterion(x_imag_pred, x_imag_tensor) total_loss = loss_real + loss_imag total_loss.backward() optimizer.step() if (epoch + 1 ) % 100 == 0 : print (f'Epoch [{epoch + 1 } /{num_epochs} ], Loss: {total_loss.item():.4 f} ' ) model.eval () with torch.no_grad(): x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor) z_hat = x_real_pred.cpu().numpy() + 1j * x_imag_pred.cpu().numpy() error = np.linalg.norm(z_hat - x) / np.linalg.norm(x) error = mean_squared_error(np.real(z_hat), np.real(x)) + mean_squared_error(np.imag(z_hat), np.imag(x)) print (f"Recovery error: {error} " ) model_path = r"E:\PhysenNet_inversion\DATA\process_DATA\complex_signal_model.pth" torch.save(model, model_path) print (f"模型已保存到 {model_path} " )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 if 'test' in process: f = 1.65e9 lamda = constants.speed_of_light / f k0 = 2 * np.pi / lamda dtheta = np.pi / 180 dphi = np.pi / 180 theta = np.arange(-np.pi / 2 , np.pi / 2 + dtheta, dtheta) phi = np.arange(0 , np.pi + dphi, dphi) NF_data1 = r'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\NearField3_10X10P25_0P25.efe' NF_data2 = r'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\NearField4_10X10P25_0P25.efe' NF_data3 = r'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\NearField5_10X10P25_0P25.efe' dz_values = [0 , 1 ] z0, z1, z2 = 3.0 * lamda, 4.0 * lamda, 5.0 * lamda Reliable_rate = 1 Ex1, Ey1, xd1, yd1, z = load_NF(NF_data1) Ex2, Ey2, xd2, yd2, z = load_NF(NF_data2) Ex3, Ey3, xd3, yd3, z = load_NF(NF_data3) Ey1_1D = Ey1.flatten(order='F' ) Ey2_1D = Ey2.flatten(order='F' ) Ey3_1D = Ey3.flatten(order='F' ) M = int (len (xd2)) N = int (len (yd2)) dx = float (xd2[1 ] - xd2[0 ]) dy = float (yd2[1 ] - yd2[0 ]) padding = 1 MI = padding * M NI = padding * N m = np.arange(-MI // 2 , MI // 2 ) n = np.arange(-NI // 2 , NI // 2 ) k_X_Rectangular = 2 * np.pi * m / (MI * dx) k_Y_Rectangular = 2 * np.pi * n / (NI * dy) dk_X = k_X_Rectangular[1 ] - k_X_Rectangular[0 ] dk_Y = k_Y_Rectangular[1 ] - k_Y_Rectangular[0 ] k_Y_Rectangular_Grid, k_X_Rectangular_Grid = np.meshgrid(k_Y_Rectangular, k_X_Rectangular) kz = np.sqrt(k0 ** 2 - k_X_Rectangular_Grid ** 2 - k_Y_Rectangular_Grid ** 2 + 0j ) W = 0.55 H = 0.428 w = dx * (M - 1 ) h = dy * (N - 1 ) thetax_sure = Reliable_rate * np.arctan(0.5 * (w - W) / z1) thetay_sure = Reliable_rate * np.arctan(0.5 * (h - H) / z1) file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\UR.npy" if os.path.isfile(file_path): print ("UR.npy 文件存在" ) UR = np.load(file_path) print ("成功加载 UR.npy 文件" ) else : print ("UR.npy 文件不存在" ) UR = np.zeros((MI, NI), dtype=int ) for i in range (MI): for j in range (NI): condition1 = (k_X_Rectangular[i] ** 2 / (k0 * np.sin(thetax_sure)) ** 2 + k_Y_Rectangular[j] ** 2 / k0 ** 2 ) < 1.0 condition2 = (k_X_Rectangular[i] ** 2 / k0 ** 2 + k_Y_Rectangular[j] ** 2 / (k0 * np.sin(thetay_sure)) ** 2 ) < 1.0 if condition1 and condition2: UR[i, j] = 1 np.save(file_path, UR) print ("已创建并保存UR.npy数据" ) fy1 = np.fft.fftshift(np.fft.ifft2(Ey2, s=(MI, NI))) * MI * NI * dx * dy fy_Magnitude = 20 * np.log10(np.abs (fy1)) fy_H_Magnitude = 20 * np.log10(np.abs (fy1 * UR)) numrows, numcols = k_Y_Rectangular_Grid.shape num_fourier_masks = 2 isim = np.imag(kz) != 0 kz[isim] = -1 * kz[isim] file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\masks.npy" if os.path.isfile(file_path): print ("masks.npy 文件存在" ) masks = np.load(file_path) print ("成功加载 masks.npy 文件" ) else : print ("masks.npy 文件不存在" ) masks = np.zeros((numrows, numcols, num_fourier_masks), dtype=np.complex128) for i in range (num_fourier_masks): phase = -1j * kz * dz_values[i] * lamda masks[:, :, i] = UR * np.exp(phase) np.save(file_path, masks) print ("已创建并保存masks.npy数据" ) fy2 = np.fft.fftshift(np.fft.ifft2(Ey2, s=(MI, NI))) * MI * NI * dx * dy x = fy2.flatten() file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\A.npy" if os.path.isfile(file_path): print ("A.npy 文件存在" ) A = np.load(file_path) print ("成功加载 A.npy 文件" ) else : print ("A.npy 文件不存在" ) A = construct_A_matrix(masks, padding, dx) np.save(file_path, A) print ("已创建并保存A.npy数据" ) b_cal = np.abs (A @ x) b_real = np.concatenate([np.abs (Ey2_1D.ravel(order='F' )), np.abs (Ey3_1D.ravel(order='F' ))]) b_err_1 = np.sqrt(np.sum ((np.abs (b_real) - b_cal) ** 2 ) / np.sum (b_cal ** 2 )) b_err_2 = np.abs ((b_real - b_cal) / b_cal) b_err = b_err_2.reshape((numrows // padding, numcols // padding, num_fourier_masks)) E1_err = np.squeeze(b_err[:, :, 0 ]) E2_err = np.squeeze(b_err[:, :, 1 ])
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 if 'trained_test' in process: device = torch.device('cuda' if (torch.cuda.is_available() and FLAG_GPU) else 'cpu' ) model_path = r"E:\PhysenNet_inversion\DATA\process_DATA\complex_signal_model.pth" if os.path.exists(model_path): print ("发现训练模型,加载模型中..." ) model = torch.load(model_path).to(device) A, y, x = generate_data(n, m) file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\test_A.npy" if os.path.isfile(file_path): print ("test_A.npy 文件存在" ) A = np.load(file_path) print ("成功加载 test_A.npy 文件" ) else : pass A_real = np.real(A).ravel(order='F' ) A_imag = np.imag(A).ravel(order='F' ) y_flat = y.flatten() A_real_tensor = torch.FloatTensor(A_real).to(device) A_imag_tensor = torch.FloatTensor(A_imag).to(device) y_tensor = torch.FloatTensor(y_flat).to(device) model.eval () with torch.no_grad(): x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor) z_hat = x_real_pred.cpu().numpy() + 1j * x_imag_pred.cpu().numpy() error = np.linalg.norm(z_hat - x) / np.linalg.norm(x) error = mean_squared_error(np.real(z_hat), np.real(x)) + mean_squared_error(np.imag(z_hat), np.imag(x)) print (f"Recovery error: {error} " )
频谱域滤波的作用 Spectral Domain Filtering(频谱域滤波) 是一种在频域中对信号进行处理的技术,通过傅里叶变换将信号从时域/空域转换到频域,然后针对特定频率成分进行增强或抑制。常见的操作包括低通滤波(保留低频)、高通滤波(保留高频)或带通滤波(保留特定频段)。
主要作用
去噪与平滑 :抑制高频噪声(如传感器噪声或干扰)。
特征提取 :增强信号中的关键频率成分(如边缘检测中的高频信号)。
带宽约束 :限制信号的频率范围以符合物理系统的实际带宽(如光学系统的衍射极限)。
先验知识嵌入 :在迭代优化中引入物理约束(如支持域、非负性等)。
在电磁场相位恢复中的必要性 相位恢复的目标是从仅有的强度测量(如光强或电场模值)中重建丢失的相位信息(如相干衍射成像、全息术)。其核心挑战在于该问题是病态(ill-posed) 的,即解不唯一或不稳定。频谱域滤波在此过程中至关重要,原因如下:
抑制噪声与伪影
实际测量中噪声通常存在于高频部分(如散粒噪声、热噪声)。
迭代恢复过程中,噪声会被反复放大,导致解偏离真实值。
频谱域滤波(如低通滤波)可有效抑制高频噪声,提升重建质量。
施加物理约束
电磁场传播受限于物理系统带宽(如光学系统的数值孔径限制)。
超出系统截止频率的成分可能是非物理的,需通过滤波去除(如带通滤波匹配系统传递函数)。
稳定迭代算法
相位恢复算法(如Gerchberg-Saxton、Fienup算法)需在空域和频域交替迭代,施加约束。
频谱域滤波作为频域约束的一部分,可防止算法发散,加速收敛。
解决病态性问题
相位恢复的数学本质是求解非线性方程,需额外约束(如稀疏性、平滑性)。
频谱域滤波通过限制解的频域特性,缩小解空间,使问题适定(well-posed)。
典型应用示例 在相干衍射成像(CDI)中:
实验测得衍射图样(频域强度)。
通过傅里叶变换迭代更新相位,每次迭代中:
频域滤波 :限制频率范围与系统带宽一致,去除超出截止频率的成分。
未滤波的迭代可能导致高频噪声主导,最终重建失败。
在相位恢复算法中频谱域滤波的作用 在相位恢复算法(如 Gerchberg-Saxton(GS)算法 和 Fienup算法 )中,“空域和频域交替迭代,施加约束”是核心操作。这一过程通过反复在两个域(空域和频域)之间传递信号,并施加不同的物理约束,逐步逼近真实的相位解。
角域(角谱)的基本概念: 角域 ,通常称为角谱 ,是指通过傅里叶变换将空间分布的电场或光场分解为不同传播方向的平面波的集合。每个平面波的传播方向由对应的空间频率决定,这些方向的角度分布即构成角谱。
平面波的波矢 $k=(k_x,k_y,k_z)$决定了波的传播方向。根据几何关系,传播方向的角度 $θ$(例如相对于z轴的倾斜角)满足:
$$\sin\theta = \frac{\sqrt{k_x^2 + k_y^2}}{k} \quad$$
因此,空间频率 (kx,ky)直接对应传播角度 。傅里叶变换后的每个点 (kx,ky) 对应一个特定方向的平面波分量,其幅度和相位由傅里叶系数给出。
通过傅里叶变换,复杂的波场被分解为无数个不同方向的平面波叠加。角谱即表示这些平面波的权重分布,反映了波场中各个传播方向的重要性。
角域的物理意义
示例说明 假设一束光垂直照射一个狭缝:
空间域 :狭缝处的电场分布$ E(x) $是矩形函数;
傅里叶变换 :得到频域的$ sinc $函数$\tilde{E}(k_x)$,对应角谱;
物理意义 :$sinc$ 函数的零点间隔表明哪些传播角度(空间频率)被抑制,对应衍射图样的暗条纹。
关键公式 :
$$ \text{角谱} \propto \tilde{E}(k_x, k_y), \quad \text{传播角度} \propto \arctan\left(\frac{\sqrt{k_x^2 + k_y^2}}{k_z}\right).$$