下面对目前所有相位恢复的代码进行整理,详细说明每段代码的原理和功能。

使用 PhaseRetrival 工具包进行相位恢复

验证传播函数正确性

下面是使用 PhaseRetrival 工具包进行相位恢复前的准备工作,使用FECO导出的数据(幅值+相位)模拟衍射,以验证衍射部分代码的正确性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
%% 导入使用的相位恢复包
addpath('E:\组会\其他\实验室历史文件\code\phasepack-matlab-master\solvers');
addpath('E:\组会\其他\实验室历史文件\code\phasepack-matlab-master\util');
addpath('E:\组会\其他\实验室历史文件\code\phasepack-matlab-master\initializers');
global numrows numcols num_fourier_masks masks padding lamda dx;

%% 参数初始化
f = 1.65e9;
lamda = physconst('lightspeed') / f;
k0 = 2 * pi / lamda;
dtheta = pi / 180;
dphi = pi / 180;
theta = -pi / 2:dtheta:pi / 2;
phi = 0:dphi:pi;

%% 导入FECO仿真得到的电场原始数据作为真值
NF_data1 = 'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\DATAyyq\l_6lamb_n_97_2-5lamb.efe';
NF_data2 = 'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\DATAyyq\l_6lamb_n_97_3-0lamb.efe';
NF_data3 = 'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\DATAyyq\l_6lamb_n_97_3-5lamb.efe';

%% 根据导入的数据,自动处理,只保留电场的主方向(y方向),因为工具包只能输入一维向量,所以这里将电场排成一列,并取得采样面上的点样点数和采样间隔
[Ex1, Ey1, xd1, yd1] = load_NF(NF_data1);
[Ex2, Ey2, xd2, yd2] = load_NF(NF_data2);
[Ex3, Ey3, xd3, yd3] = load_NF(NF_data3);
Ey1_1D = Ey1(:);
Ey2_1D = Ey2(:);
Ey3_1D = Ey3(:);

M = length(xd2); % x方向采样点数
N = length(yd2); % y方向采样点数
dx = (xd2(2) - xd2(1)); % x方向采样间隔
dy = (yd2(2) - yd2(1)); % y方向采样间隔
image-20250407101206002
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
%% 对原始数据进行扩展,在电场外围补0,增加谱域的分辨率,边长扩展为原来的两倍
padding = 2;
MI = padding * M;
NI = padding * N;
m = -MI / 2:1:MI / 2 - 1;
n = -NI / 2:1:NI / 2 - 1;

k_X_Rectangular = 2 * pi * m / (MI * dx);
k_Y_Rectangular = 2 * pi * n / (NI * dy);
dk_X = k_X_Rectangular(2) - k_X_Rectangular(1);
dk_Y = k_Y_Rectangular(2) - k_Y_Rectangular(1);
[k_Y_Rectangular_Grid, k_X_Rectangular_Grid] = meshgrid(k_Y_Rectangular, k_X_Rectangular);

%% 计算第一个面电场(幅值+相位)的角谱,并绘制
fy1 = fftshift(ifft2(Ey2, MI, NI) * MI * NI) * dx * dy;
fy_Magnitude = 20 * log10(abs(fy1));
image-20250407102315210
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
%% 频谱域滤波
kz = sqrt(k0 ^ 2 - k_X_Rectangular_Grid .^ 2 - k_Y_Rectangular_Grid .^ 2);

%% 导入天线口径尺寸、测量面尺寸、测量面距离
W = 0.55;
H = 0.428; %天线孔径尺寸

w = dx * (M - 1); % x方向扫描区域的长度[m]
h = dy * (N - 1); % y方向扫描区域的长度[m]

z0 = 2.5 * lamda; % 扫描面位置
z1 = 3.0 * lamda; % 参考平面位置
z2 = 3.5 * lamda;

%% 设置可信比例,需要同时实验测试出最佳数值,不同案例的数值不同,一般在0.8~1.0之间
Reliable_rate = 1;

%% 计算可信角度
thetax_sure = Reliable_rate * atan(0.5 * (w - W) / z1);
thetay_sure = Reliable_rate * atan(0.5 * (h - H) / z1);

%% 计算频谱域滤波函数
UR = zeros(MI, NI);

for i = 1:MI
for j = 1:NI
if ((k_X_Rectangular(i) * k_X_Rectangular(i) / ((k0 * sin(thetax_sure)) ^ 2)) + (k_Y_Rectangular(j) * k_Y_Rectangular(j) / (k0 ^ 2)) < 1.0 && (k_X_Rectangular(i) * k_X_Rectangular(i) / (k0 ^ 2)) + (k_Y_Rectangular(j) * k_Y_Rectangular(j) / ((k0 * sin(thetay_sure)) ^ 2)) < 1.0)
UR(i, j) = 1;
else
UR(i, j) = 0;
end
end
end

%% 计算天线的可信波谱(频谱域滤波函数与电场角谱相乘),并绘制
fy_H_Magnitude = 20 * log10(abs(fy1 .* UR));
image-20250407102657234
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
%% 创建‘num_fourier_mask ’傅里叶掩码,即矩阵A,实现物理衍射过程。
[numrows, numcols] = size(k_Y_Rectangular_Grid); % 设置掩码尺寸

num_fourier_masks = 2; % 设置衍射面数(原始面、衍射面)
masks = zeros(numrows, numcols, num_fourier_masks);

dz = [0 0.5];

isim = imag(kz) ~= 0; % k圆的外部
kz(isim) = -1 * kz(isim); % yyq - 修正k圆的外部衍射公式 -1i 改为 -1

for i = 1:num_fourier_masks
% i 为 1 时,mask 为 全 1 矩阵,即不改变原始面
% i 为 2 时,mask 为 衍射矩阵,即通过第一个面的角谱计算得到第二个面
masks(:, :, i) = UR .* exp(-1i * kz * dz(i) * lamda);
end

%% 验证计算结果,使用原始数据信息直接获取幅相
fy0 = fftshift(ifft2(Ey2, MI, NI) * MI * NI) * dx * dy;
x = fy0(:); % 将 角谱信号 转换为一维向量,以便PhasePack可以处理它

b_cal = abs(A(x)); % 函数A —— 将 谱域x 使用 傅里叶变换 转换成 场
b_real = [abs(Ey2_1D); abs(Ey3_1D)];
image-20250407103208504
1
2
3
4
5
6
7
% 计算两个面分别与真实值之间的误差
b_err_1 = sqrt(sum(abs(abs(b_real) - b_cal) .^ 2, 'all') / sum(b_cal .^ 2, 'all'));
b_err_2 = abs((b_real - b_cal) ./ b_cal);

b_err = reshape(b_err_2, [numrows / padding, numcols / padding, num_fourier_masks]);
E1_err = (squeeze(b_err(:, :, 1)));
E2_err = (squeeze(b_err(:, :, 2)));
image-20250407103324787 image-20250407103342730

PhaseRetrival 工具包进行相位恢复

上面已经验证了衍射过程的正确,下面开始使用 PhaseRetrival 工具包进行相位恢复,这个过程是已经集成好的,只需要设置少量参数即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
%% 设置PhasePack选项
opts = struct; % 创建一个空结构体来存储选项
opts.algorithm = 'RAF'; % 求解方法 RAF
opts.initMethod = 'Weighted'; % 求解器生成初始起点方法 Weighted
opts.tol = 1e-6; % 容差 - 将其变小以获得更精确的解决方案,或将其增大以获得更快的运行时间
opts.verbose = 1; % 求解器运行时输出大量信息(将此设置为1或0以减少输出)
opts.maxIters = 1000;
% 这里还有更多的选项,比如:opts.searchMethod = 'LBFGS'、initSpectral、initOrthogonal、initWeighted、initOptimalSpectral,具体内容还未了解

%% 运行相位恢复算法
fprintf('Running.....\n 算法: %s \n 初始化: %s \n 误差: %s \n 迭代次数: %d \n', opts.algorithm, opts.initMethod, opts.tol, opts.maxIters);
% 使用测量算子“A”、伴随算子“At”、测量值“b”、要恢复的信号长度和选项来调用求解器。
% 注意,此方法可以接受函数句柄或矩阵作为测量操作符。这里,我们使用函数句柄,因为我们依赖FFT来快速完成任务。如果A是函数句柄,则必须提供At和n
[x, outs, opts] = solvePhaseRetrieval(@A, @At, b_real, numel(k_Y_Rectangular_Grid), opts);

% 将矢量输出转换回二维图像
recovered_image = reshape(x, numrows, numcols);

%打印一些有用的信息到控制台
fprintf('Image recovery required %d iterations (%f secs)\n', outs.iterationCount, outs.solveTimes(end));

by_retrival = A(recovered_image); % 函数A —— 将 谱域x 使用 傅里叶变换 转换成 场
by_retrival = reshape(by_retrival, [numrows / padding, numcols / padding, num_fourier_masks]);
E2 = (squeeze(by_retrival(:, :, 1))); % yyq - 更正远场变量命名方式,E1 改为 E2
image-20250407104118383 image-20250407104318928

对比远场数据

为验证 PhaseRetrival 工具包的计算结果,使用其计算出的远场方向图与FECO导出的远场进行对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
% 计算远场方向图
[EPLANE, HPLANE, Etot] = computerFF(theta, phi, Ex2, E2, k0, M, N, dx); % yyq - 更正计算远场的电场平面,E1 改为 E2
[EPLANE2, HPLANE2, Etot2] = computerFF(theta, phi, Ex2, Ey2, k0, M, N, dx);

% 远场误差计算
Etot_Size = size(Etot);

ffH_err = 20 * log10(abs((Etot(1, :) - Etot2(1, :)) / max(Etot2(1, :))));
ffE_err = 20 * log10(abs((Etot(floor(Etot_Size(2) / 2), :)) - Etot2(floor(Etot_Size(2) / 2), :)) ./ max(Etot2(floor(Etot_Size(2) / 2), :)));

ENL_H = 20 * log10(mean(abs((Etot(1, :) - Etot2(1, :)) / max(Etot2(1, :)))));
ENL_E = 20 * log10(mean(abs((Etot(floor(Etot_Size(2) / 2), :)) - Etot2(floor(Etot_Size(2) / 2), :)) ./ max(Etot2(floor(Etot_Size(2) / 2), :))));
ENL = 20 * log10(mean(abs((Etot - Etot2) / max(max(Etot2))), "all"));

% 读取仿真软件仿真的远场方向图
far_data = importdata("E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\ffdata.dat");
xn1 = far_data.data(:, 1);
yn1 = far_data.data(:, 2);
yn2 = far_data.data(:, 3);
image-20250407104225163

辅助函数

下面是 PhaseRetrival 工具包在计算两个面衍射的时候,使用的辅助函数A 和 A的逆函数At。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
% 函数A —— 将 谱域x 使用 傅里叶变换 转换成 场
function measurements = A(pixels)
global numrows numcols num_fourier_masks masks padding dx;

dk_X = 2 * pi / (numrows * dx);

% 重构方法将迭代存储为向量,因此该函数需要接受一个向量作为输入,恢复为2维图像
im = reshape(pixels, [numrows, numcols]);

measurements = zeros(numrows / padding, numcols / padding, num_fourier_masks);
% 将 输入 进行 傅里叶变换 转换成 场
for m = 1:num_fourier_masks
this_mask = squeeze(masks(:, :, m));
measurements_Padded = (fft2(ifftshift(im .* this_mask))) * dk_X * dk_X / (4 * pi * pi);
measurements(:, :, m) = measurements_Padded(1:numrows / padding, 1:numcols / padding);
end

measurements = measurements(:);
end

function pixels = At(measurements)
global numrows numcols num_fourier_masks masks padding dx;

dk_X = 2 * pi / (numrows * dx);

im_reconstructed = zeros(numrows, numcols);
measurements = reshape(measurements, [numrows / padding, numcols / padding, num_fourier_masks]);

for m = 1:num_fourier_masks
this_mask = squeeze(masks(:, :, m));
this_measurements = squeeze(measurements(:, :, m));
im_reconstructed = im_reconstructed + conj(this_mask) .* fftshift(ifft2(this_measurements, numrows, numcols)) * numrows * numcols * dk_X * dk_X / (4 * pi * pi);
end

pixels = im_reconstructed(:);
end

使用U-Net 网络直接进行相位恢复

下面是最初直接使用tensorflow 搭建 U-Net 网络进行相位恢复的代码

数据读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 读取 FECO 导出的电场数据,与上面 PhaseRetrival 不同的是,这里输入和输出均为二维图像,不需要变为一维进行处理
class PlaneData:
def __init__(self, file_name):
abs_temp = []
E_temp = []
phase_temp = []
f = open(file_name)
E = f.read()
if file_name.split('.')[-1] == 'txt':
E = E.split('\n')[2:-1]
elif file_name.split('.')[-1] == 'efe':
E = E.split('\n')[16:-2]

x = int(np.sqrt(len(E)))
y = int(np.sqrt(len(E)))

index_dot = file_name.find('-')
distance_plane = int(file_name[index_dot - 1]) + float(file_name[index_dot + 1]) * 0.1

main_direction = 'y'

for k in E:
E_xp, E_yp, E_zp, E_xr, E_xi, E_yr, E_yi, E_zr, E_zi = k.split()
E_x = complex(float(E_xr), float(E_xi))
E_y = complex(float(E_yr), float(E_yi))
E_z = complex(float(E_zr), float(E_zi))

if main_direction == 'y':
abs_temp.append(np.abs(E_y))
E_temp.append(E_y)
phase = np.angle(E_y)
phase_temp.append(phase)
elif main_direction == 'x':
abs_temp.append(np.abs(E_x))
E_temp.append(E_x)
phase = np.angle(E_x)
phase_temp.append(phase)
elif main_direction == 'z':
abs_temp.append(np.abs(E_z))
E_temp.append(E_z)
phase = np.angle(E_z)
phase_temp.append(phase)

abs_pro = np.array(abs_temp).reshape((x, y))
E_temp = np.array(E_temp).reshape((x, y))
phase_pro = np.array(phase_temp).reshape((x, y))

self.abs = abs_pro[0:96, 0:96] / np.max(abs_pro[0:96, 0:96])
self.phase = phase_pro[0:96, 0:96]
self.E = E_temp[0:96, 0:96] / np.max(np.abs(E_temp[0:96, 0:96]))
self.N = x - 1
self.distance = distance_plane

plane_1 = PlaneData(r'DATA\l_6lamb_n_97_2-5lamb.efe')
plane_2 = PlaneData(r'DATA\l_6lamb_n_97_3-0lamb.efe')
plane_3 = PlaneData(r'DATA\l_6lamb_n_97_3-2lamb.efe')

定义变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 定义变量
dim = plane_1.N # 设置电场平面维度为256
jqx = 0 # 截取位置的横坐标
jqy = 0 # 截取位置的纵坐标
plane_W = dim # 图像宽度
plane_H = dim # 图像高度
batch_size = 1 # 批处理大小为1

# 定义截取平面参数
lamb = 299792458 / 1.65e9 # 波长(单位:米)
pixelsize = lamb / 8 # 像素尺寸(单位:米)
N = dim # 像素数量
L = N * pixelsize # 物体平面和图像平面的长度(单位:米)

# 定义机器学习参数
Steps = 5000 # 迭代步数
LR = 0.01 # 学习率
noise_level = 0 # 噪声水平

定义网络结构

1
2
3
4
5
6
7
8
9
10
11
# 定义输入层,用于接收噪声和未测量数据
with tf.compat.v1.variable_scope('input'):
rand = tf.placeholder(shape=(dim, dim), dtype=tf.float32, name='noise')
true_measure = tf.constant(plane_2.abs, dtype=tf.float32, name='diffraction')
inpt = true_measure + rand

# 定义推断层,用于推断未测量数据
with tf.compat.v1.variable_scope('inference'):
out = model_Unet.inference(inpt, plane_H, plane_W, batch_size)
out = tf.reshape(out, [plane_H, plane_W])
# 上面代码中 model_Unet.inference 即为网络结构

引入物理衍射

1
2
3
# 定义物理模型,用于计算测量数据
out_calculate = Calculate_step.Phy(out, lamb, L, (plane_2.distance - plane_1.distance) * lamb,
plane_1.abs, jqx, jqy, plane_H, plane_W)

上面 Calculate_step.Phy 函数用来模拟物理传播过程,与之前使用 PhaseRetrival 工具包进行相位恢复时的物理传播过程保持一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def Phy(fai_1, lamb, L, Z, E_1_abs, jqx, jqy, plane_H, plane_W):
M = int(fai_1.shape[1])
E_abs_1 = E_1_abs
E_abs_1 = E_abs_1[jqx:jqx + plane_H, jqy:jqy + plane_W]
E_abs_1 = E_abs_1 / np.max(E_abs_1)

# 将'fai_1'转换为复数数据类型
fai_1 = tf.cast(fai_1, dtype=tf.complex128)
# 将'fai_1'乘以虚数单位'j'
fai_1 = 1j * fai_1
# 对'fai_1'进行指数运算
U_in0 = tf.exp(fai_1)
# 对'fai_1'进行幅值加权运算
U_in = tf.multiply(U_in0, E_abs_1)
# 使用'Myfftshift'模块中的自定义函数应用FFT和shift操作
U_in_fft = Myfftshift.fftshift(tf.signal.fft2d(U_in))
U_in_fft = tf.cast(U_in_fft, dtype=tf.complex128)

# 计算空间频率
fx = 1 / L
# 生成一个均匀间隔的数字数组'x'
x = np.linspace(-M / 2, M / 2 - 1, M)
# 将'fx'与'x'相乘
fx = (fx * x)
# 使用'fx'创建一个二维坐标网格
[Fx, Fy] = np.meshgrid(fx, fx)
# 计算一个常数'k'
k = 2 * math.pi / lamb
# k = 2 * math.pi / lamb * 2

# 计算传递函数'G'
# Ax\Ay是口面场的长和宽,Lx\Ly是测量面的长和宽
Ax = 548
Ay = 426
Lx = L * 1000
Ly = L * 1000
# 设置可信比例
Reliable_rate = 1

thea_x = Reliable_rate * np.arctan(((Lx - Ax) / 2 / 2.5 / lamb / 1000))
thea_y = Reliable_rate * np.arctan(((Ly - Ay) / 2 / 2.5 / lamb / 1000))

G1 = (1 - lamb * lamb * (Fx * Fx / np.sin(thea_x) / np.sin(thea_x) + Fy * Fy))
G2 = (1 - lamb * lamb * (Fx * Fx + Fy * Fy / np.sin(thea_y) / np.sin(thea_y)))
G = (1 - lamb * lamb * (Fx * Fx + Fy * Fy))
H = np.ones_like(G) * 1j

for i in range(H.shape[0]):
for j in range(H.shape[1]):
if G[i, j] >= 0:
temp = k * (-1 * Z) * np.sqrt(G[i, j])
H[i, j] = np.exp(1j * temp)
else:
temp = k * (1j * Z) * np.sqrt(1j * G[i, j] * 1j)
H[i, j] = np.exp(-1 * temp)

H = tf.cast(H, dtype=tf.complex128)
U_out_fft = tf.multiply(U_in_fft, H)
# 应用逆FFT和shift操作
U_out = tf.signal.ifft2d(Myfftshift.ifftshift(U_out_fft))
# 对'U_out'进行归一化
U_out_abs = tf.abs(U_out)
temp = tf.reduce_max(tf.reduce_max(U_out_abs))
U_out = tf.abs(U_out) / temp
# 返回最终的电场强度
return U_out

神经网络补充

1
2
3
4
5
6
# 定义损失函数,用于计算均方误差
loss_m = tf.compat.v1.losses.mean_squared_error(out_calculate, true_measure) # 代价函数(不依赖于标记数据)
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=LR)

# 获取更新操作集合
update_ops = tf.compat.v1.get_collection(tf.compat.v1.GraphKeys.UPDATE_OPS)

神经网络初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
# 确保在训练操作之前先执行更新操作
with tf.control_dependencies(update_ops):
train_op = optimizer.minimize(loss_m)

# 初始化saver
saver = tf.compat.v1.train.Saver()

with tf.compat.v1.Session() as sess:
# 初始化全局变量
sess.run(tf.global_variables_initializer())
# 初始化损失值
m_loss = np.zeros([Steps, 1])
temp_m_loss = []

网络迭代优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 开始优化
for step in range(Steps):

# 生成随机数
new_rand = np.random.uniform(0, noise_level, size=(dim, dim)) # noise_level
sess.run([train_op], feed_dict={rand: new_rand})

loss_measure = sess.run(loss_m, feed_dict={rand: new_rand})
m_loss[step] = loss_measure
print('Step:%d Measure loss:%f' % (step, loss_measure))

# 这行代码创建了一个名为 new_rand 的元素全为 0 的 Numpy 数组,形状为 (dim, dim)。
new_rand = np.random.uniform(0, 0.0, size=(dim, dim))
pha_out = sess.run(out, feed_dict={rand: new_rand}).reshape(dim, dim)
out_calculate_temp = sess.run(out_calculate, feed_dict={rand: new_rand}).reshape(dim, dim)

使用U-Net 网络源重构进行相位恢复

下面是通过源重构进行相位恢复,把上面物理衍射的物理过程改为格林公式,模拟等效源在空间中的辐射过程。

模型差异

其机器学习相关的大量代码——网络结构、初始化、迭代过程——与上一个保持一致,只是物理过程部分的代码不同,下面只展示不同部分。

1
2
3
4
5
6
7
8
9
# 定义物理模型,用于计算测量数据
sample1_cords = Calculate_step.set_cords(-1 * int(N / 2) * dx, int(N / 2) * dy, dx, dy,
plane_RS.distance * lamda)
source_cords = Calculate_step.set_cords(-1 * int(N / 2) * dx, int(N / 2) * dy, dx, dy,
plane_RS.distance * lamda * 0.8)
g = Calculate_step.green_function(source_cords, sample1_cords, k, dx, dy)

out_calculate = tf.abs(g @ out)
out_calculate = tf.reshape(out_calculate, [dim, dim])
1
2
3
4
5
6
7
8
9
10
11
12
13
# Calculate_step.set_cords 函数是设置并保存 等效源与测量平面 的坐标的函数,后续传给计算出格林函数的模块。
def set_cords(begin, end, dx, dy, Z):
x_d = np.arange(begin, end, dx)
y_d = np.arange(begin, end, dy)
[x_grid, y_grid] = np.meshgrid(x_d, y_d)
z_grid = Z * np.ones_like(x_grid)

x_cords = x_grid.reshape(-1, 1)
y_cords = y_grid.reshape(-1, 1)
z_cords = z_grid.reshape(-1, 1)

return_cords = np.hstack((y_cords, x_cords, z_cords))
return return_cords
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Calculate_step.green_function 函数是生成等效源到测量平面的格林函数g,进而通过神经网络得到的等效源模拟辐射,计算得到测量面的电场。
def green_function(source_xy, sample_xy, k, dx, dy):
x_source = source_xy[:, 0]
y_source = source_xy[:, 1]
z_source = source_xy[:, 2]

x_sample = sample_xy[:, 0]
y_sample = sample_xy[:, 1]
z_sample = sample_xy[:, 2]

g = np.zeros((len(x_sample), len(x_source)), dtype=complex)

# 矢量化计算格林函数
for m in range(len(x_sample)):
R_ij = np.sqrt((x_sample[m] - x_source) ** 2 + (y_sample[m] - y_source) ** 2 + (z_sample[m] - z_source) ** 2)
g[m, :] = 1 / (4 * np.pi) * (np.exp(-1j * k * R_ij) / R_ij ** 2) * ((z_sample[m] - z_source) * (1j * k + 1 / R_ij)) * dx * dy

return g

使用全连接神经网络进行相位恢复

论文复现

首先,下面是对 PhaseRetrival 工具包中相位恢复相关论文中提到的原理复现。这一部分并不涉及机器学习,是传统的相位恢复算法——使用加权最大相关初始化,并用加权梯度流进行迭代。

按照给定的形状随机生成实数信号(y、A、x)

1
2
3
4
5
6
7
8
9
def generate_data(n, m):
# 生成真实信号 x (n维向量)
x = np.random.randn(n)
x = x / np.linalg.norm(x) # 归一化
# 生成随机高斯测量矩阵 A (m x n)
A = np.random.randn(m, n)
# 生成测量值 y = |A x|
y = np.abs(A @ x)
return A, y, x

定义初始化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def weighted_max_correlation_init(A, y, S_size):
m, n = A.shape
S = np.argsort(y)[-S_size:] # 选择最大的 S_size 个测量值对应的索引
A_S = A[S, :]
y_S = y[S]
# 计算加权矩阵 Y
gamma = 0.5 # 论文中默认的 gamma 值
weights = y_S ** gamma
Y = (A_S.T * weights) @ A_S / S_size

# 计算 Y 的最大特征向量作为初始估计
_, _, Vt = svds(Y, k=1)
z0 = Vt[0]
# 估计信号范数
norm_estimate = np.sqrt(np.mean(y ** 2))
z0 = z0 * norm_estimate
return z0

定义优化方法——加权幅流法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def reweighted_amplitude_flow(A, y, z0, max_iter=3000, mu=0.5, beta=5):
m, n = A.shape
z = z0.copy()
for t in range(max_iter):
# 计算当前估计的幅度
Az = A @ z
Az_abs = np.abs(Az)
# 计算权重
ratios = Az_abs / y
weights = 1 / (1 + beta / ratios)
# 计算梯度
gradient = (A.T @ (weights * (Az - y * (Az / Az_abs)))) / m
# 更新估计
z = z - mu * gradient
# 检查收敛条件
if np.linalg.norm(gradient) < 1e-6:
break
return z

主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def main():
n = 1000 # 信号维度
m = 2 * n - 1 # 测量数量,接近信息论极限
S_size = int(3 * m / 13) # 初始化时选择的测量数量
# 生成数据
A, y, x = generate_data(n, m)

# 加权最大相关初始化
z0 = weighted_max_correlation_init(A, y, S_size)

# 迭代加权梯度流
z_hat = reweighted_amplitude_flow(A, y, z0)

# 计算恢复误差
error = np.linalg.norm(z_hat - x) / np.linalg.norm(x)
print(f"Recovery error: {error}")

引入神经网络实现

下面把神经网络引入,根据之前同学提醒,打算使用端对端的神经网络,通过输入A与y,直接得到待求的x。

1
2
3
4
5
6
# 生成数据的函数保持不变
def generate_data(n, m):
x = np.random.randn(n)
A = np.random.randn(m, n)
y = np.abs(A @ x)
return A, y, x
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 定义神经网络模型
class SignalRecoveryModel(nn.Module):
def __init__(self, m, n):
super(SignalRecoveryModel, self).__init__()
# 输入是 A 和 y 的组合,A 是 m x n 矩阵,y 是 m 维向量
# 将 A 和 y 展平并拼接为一个输入向量
self.input_size = m * n + m # A 的大小是 m * n,y 的大小是 m
self.fc1 = nn.Linear(self.input_size, 512)
self.fc2 = nn.Linear(512, 256)
self.fc3 = nn.Linear(256, n) # 输出是 n 维信号 x
self.relu = nn.ReLU()

def forward(self, A, y):
# 将 A 和 y 展平并拼接
A_flat = A.view(-1) # 将 A 展平为 (m * n) 向量
y_flat = y.view(-1) # 将 y 展平为 m 向量
x_input = torch.cat((A_flat, y_flat), dim=0) # 拼接为 (m * n + m) 向量
x_input = x_input.unsqueeze(0) # 添加 batch 维度
x = self.relu(self.fc1(x_input))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x.squeeze(0) # 去掉 batch 维度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def main():
n = 97 * 2 # 信号维度
m = 2 * n # 测量数量,接近信息论极限

# 生成数据
A, y, x = generate_data(n, m)

# 将数据转换为 PyTorch 张量
A_tensor = torch.FloatTensor(A)
y_tensor = torch.FloatTensor(y)
x_tensor = torch.FloatTensor(x)

# 初始化模型
model = SignalRecoveryModel(m, n)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
num_epochs = 1000
for epoch in range(num_epochs):
model.train()
optimizer.zero_grad()
outputs = model(A_tensor, y_tensor)
loss = criterion(outputs, x_tensor)
loss.backward()
optimizer.step()

if (epoch + 1) % 100 == 0:
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

# 测试模型
model.eval()
with torch.no_grad():
z_hat = model(A_tensor, y_tensor).numpy()

# 计算恢复误差
error = np.linalg.norm(z_hat - x) / np.linalg.norm(x)
print(f"Recovery error: {error}")

复数改进

上面的代码只针对实数的情况,但在相位恢复中的数据都是复数,目前的神经网络只支持实数,所以需要对其进行改进,以满足复数的需要,下面是改进后的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 生成复数数据
def generate_data(n, m):
# 生成复数信号 x (n维向量)
x = np.random.randn(n) + 1j * np.random.randn(n)
x = x / np.linalg.norm(x) # 归一化

# 生成复数高斯测量矩阵 A (m x n)
A = np.random.randn(m, n) + 1j * np.random.randn(m, n)

# 生成测量值 y = |A x|
y = np.abs(A @ x) # y 是实数向量

return A, y, x
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 定义复数神经网络模型
class ComplexSignalRecoveryModel(nn.Module):
def __init__(self, m, n):
super(ComplexSignalRecoveryModel, self).__init__()
# 输入维度: A的实部(m*n) + A的虚部(m*n) + y(m) = 2*m*n + m
self.input_size = 2 * m * n + m
# 输出维度: x的实部(n) + x的虚部(n) = 2n
self.output_size = 2 * n

# 定义网络结构
self.fc1 = nn.Linear(self.input_size, 2048)
self.fc2 = nn.Linear(2048, 1024)
self.fc3 = nn.Linear(1024, self.output_size)
self.relu = nn.ReLU()

def forward(self, A_real_flat, A_imag_flat, y_flat):
# 拼接输入: [A_real, A_imag, y]
x_input = torch.cat([A_real_flat, A_imag_flat, y_flat], dim=0)
x_input = x_input.unsqueeze(0) # 添加batch维度

# 前向传播
x = self.relu(self.fc1(x_input))
x = self.relu(self.fc2(x))
x = self.fc3(x)

# 输出拆分为实部和虚部
x_out = x.squeeze(0)
x_real = x_out[:self.output_size // 2]
x_imag = x_out[self.output_size // 2:]
return x_real, x_imag
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def main():
n = 25 # 信号维度
m = 2 * n - 1 # 测量数量

# 生成复数数据
A, y, x = generate_data(n, m)
x_real_true = np.real(x)
x_imag_true = np.imag(x)

# 将A拆分为实部和虚部,并展平
A_real = np.real(A).flatten()
A_imag = np.imag(A).flatten()
y_flat = y.flatten()

# 转换为PyTorch张量
A_real_tensor = torch.FloatTensor(A_real)
A_imag_tensor = torch.FloatTensor(A_imag)
y_tensor = torch.FloatTensor(y_flat)
x_real_tensor = torch.FloatTensor(x_real_true)
x_imag_tensor = torch.FloatTensor(x_imag_true)

# 初始化模型
model = ComplexSignalRecoveryModel(m, n)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
num_epochs = 1000
for epoch in range(num_epochs):
model.train()
optimizer.zero_grad()
x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor)

# 计算实部和虚部的损失
loss_real = criterion(x_real_pred, x_real_tensor)
loss_imag = criterion(x_imag_pred, x_imag_tensor)
total_loss = loss_real + loss_imag

total_loss.backward()
optimizer.step()

if (epoch + 1) % 100 == 0:
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss.item():.4f}')

# 测试模型
model.eval()
with torch.no_grad():
x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor)
z_hat = x_real_pred.numpy() + 1j * x_imag_pred.numpy()

# 计算恢复误差
error = np.linalg.norm(z_hat - x) / np.linalg.norm(x)
print(f"Recovery error: {error}")

验证传播函数的正确性

这一部分用来验证传播函数部分代码的正确性,共有两个部分。第一部分是与之前 PhaseRetrival 工具包一样,验证衍射过程的正确性;第二部分是因为在神经网络中需要测量矩阵A的显示表达,所以还需验证其正确性。

首先是衍射过程中函数A和A的逆函数At的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 函数A的实现
def A_function(pixels, masks, padding, dx):
numrows, numcols = masks.shape[:2]
im = pixels.reshape(numrows, numcols)
measurements = np.zeros((numrows // padding, numcols // padding, masks.shape[2]), dtype=np.complex128)

dk_X = 2 * np.pi / (numrows * dx)
for m in range(masks.shape[2]):
masked = im * masks[:, :, m]
ft = np.fft.fft2(np.fft.ifftshift(masked))
measurements[:, :, m] = ft[:numrows // padding, :numcols // padding] * (dk_X ** 2) / (4 * np.pi ** 2)

return measurements.ravel(order='F')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 函数At的实现
def At_function(measurements, numrows, numcols, num_fourier_masks, masks, padding, dx):
dk_X = 2 * np.pi / (numrows * dx)

im_reconstructed = np.zeros((numrows, numcols), dtype=complex)
measurements = np.reshape(measurements, (numrows // padding, numcols // padding, num_fourier_masks), order='F')

for m in range(num_fourier_masks):
this_mask = np.squeeze(masks[:, :, m])
this_measurements = np.squeeze(measurements[:, :, m])
im_reconstructed += np.conj(this_mask) * np.fft.fftshift(np.fft.ifft2(this_measurements, s=(numrows, numcols))) * numrows * numcols * dk_X * dk_X / (4 * np.pi * np.pi)

pixels = im_reconstructed.ravel(order='F')
return pixels

下面是测量矩阵A的显示表达函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
def At_function(measurements, numrows, numcols, num_fourier_masks, masks, padding, dx):
dk_X = 2 * np.pi / (numrows * dx)

im_reconstructed = np.zeros((numrows, numcols), dtype=complex)
measurements = np.reshape(measurements, (numrows // padding, numcols // padding, num_fourier_masks), order='F')

for m in range(num_fourier_masks):
this_mask = np.squeeze(masks[:, :, m])
this_measurements = np.squeeze(measurements[:, :, m])
im_reconstructed += np.conj(this_mask) * np.fft.fftshift(np.fft.ifft2(this_measurements, s=(numrows, numcols))) * numrows * numcols * dk_X * dk_X / (4 * np.pi * np.pi)

pixels = im_reconstructed.ravel(order='F')
return pixels
image-20250407190451927

功能整合

整合上面提到的功能,在下面的代码中,可以完成三个主要的功能:

1
process = ['train', 'test', 'trained_test']
  • 'train':使用随机生成的数据训练模型
  • 'test':不使用神经网络的部分,仅仅验证传播函数的正确性
  • 'trained_test':对训练好的模型进行测试,目前是使用重新随机生成的数据进行测试,接下来还会补充使用FECO导出的电场数据进行测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
if 'train' in process:
# 检查是否有GPU可用
device = torch.device('cuda' if (torch.cuda.is_available() and FLAG_GPU) else 'cpu')

# 生成复数数据
A, y, x = generate_data(n, m)

# 保存测试 A矩阵
file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\test_A.npy"
np.save(file_path, A)
print("已创建并保存test_A.npy数据")

# 初始化模型
model = ComplexSignalRecoveryModel(m, n).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
num_epochs = 1000
for epoch in range(num_epochs):
model.train()
optimizer.zero_grad()

# 生成复数数据
A, y, x = generate_data(n, m, A)
x_real_true = np.real(x)
x_imag_true = np.imag(x)

# 将A拆分为实部和虚部,并展平
A_real = np.real(A).ravel(order='F')
A_imag = np.imag(A).ravel(order='F')
y_flat = y.flatten()

# 转换为PyTorch张量
A_real_tensor = torch.FloatTensor(A_real).to(device)
A_imag_tensor = torch.FloatTensor(A_imag).to(device)
y_tensor = torch.FloatTensor(y_flat).to(device)
x_real_tensor = torch.FloatTensor(x_real_true).to(device)
x_imag_tensor = torch.FloatTensor(x_imag_true).to(device)

x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor)

# 计算实部和虚部的损失
loss_real = criterion(x_real_pred, x_real_tensor)
loss_imag = criterion(x_imag_pred, x_imag_tensor)
total_loss = loss_real + loss_imag

total_loss.backward()
optimizer.step()

if (epoch + 1) % 100 == 0:
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss.item():.4f}')

# 测试模型
model.eval()
with torch.no_grad():
x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor)
z_hat = x_real_pred.cpu().numpy() + 1j * x_imag_pred.cpu().numpy()

# 计算恢复误差
error = np.linalg.norm(z_hat - x) / np.linalg.norm(x)
error = mean_squared_error(np.real(z_hat), np.real(x)) + mean_squared_error(np.imag(z_hat), np.imag(x))
print(f"Recovery error: {error}")

model_path = r"E:\PhysenNet_inversion\DATA\process_DATA\complex_signal_model.pth"

# 训练完成后保存模型
torch.save(model, model_path)
print(f"模型已保存到 {model_path}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
if 'test' in process:
# 参数初始化
f = 1.65e9
lamda = constants.speed_of_light / f
k0 = 2 * np.pi / lamda
dtheta = np.pi / 180
dphi = np.pi / 180
theta = np.arange(-np.pi / 2, np.pi / 2 + dtheta, dtheta)
phi = np.arange(0, np.pi + dphi, dphi)

NF_data1 = r'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\NearField3_10X10P25_0P25.efe'
NF_data2 = r'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\NearField4_10X10P25_0P25.efe'
NF_data3 = r'E:\组会\其他\实验室历史文件\code\phase retrival\NFdata\NearField5_10X10P25_0P25.efe'
dz_values = [0, 1] # 衍射面间距数组
z0, z1, z2 = 3.0 * lamda, 4.0 * lamda, 5.0 * lamda
Reliable_rate = 1

# 加载近场数据
Ex1, Ey1, xd1, yd1, z = load_NF(NF_data1) # Ex_matrix, Ey_matrix, xd, yd, z
Ex2, Ey2, xd2, yd2, z = load_NF(NF_data2)
Ex3, Ey3, xd3, yd3, z = load_NF(NF_data3)

# 转换为1D向量 (考虑numpy存储顺序)
Ey1_1D = Ey1.flatten(order='F') # 使用Fortran列优先顺序匹配MATLAB
Ey2_1D = Ey2.flatten(order='F')
Ey3_1D = Ey3.flatten(order='F')

# 计算采样参数 (添加类型转换确保整数)
M = int(len(xd2)) # x方向采样点数
N = int(len(yd2)) # y方向采样点数
dx = float(xd2[1] - xd2[0]) # x方向采样间隔
dy = float(yd2[1] - yd2[0]) # y方向采样间隔

# 谱域滤波参数
padding = 1
MI = padding * M
NI = padding * N

# 生成波数网格
m = np.arange(-MI // 2, MI // 2)
n = np.arange(-NI // 2, NI // 2)
k_X_Rectangular = 2 * np.pi * m / (MI * dx)
k_Y_Rectangular = 2 * np.pi * n / (NI * dy)

# 计算波数间隔
dk_X = k_X_Rectangular[1] - k_X_Rectangular[0]
dk_Y = k_Y_Rectangular[1] - k_Y_Rectangular[0]

# 生成波数网格
k_Y_Rectangular_Grid, k_X_Rectangular_Grid = np.meshgrid(k_Y_Rectangular, k_X_Rectangular)

# 计算kz
kz = np.sqrt(k0 ** 2 - k_X_Rectangular_Grid ** 2 - k_Y_Rectangular_Grid ** 2 + 0j)

# 生成可靠性区域
W = 0.55
H = 0.428

# 扫描区域尺寸
w = dx * (M - 1)
h = dy * (N - 1)

# 计算可靠性角度
thetax_sure = Reliable_rate * np.arctan(0.5 * (w - W) / z1)
thetay_sure = Reliable_rate * np.arctan(0.5 * (h - H) / z1)

# 定义文件路径
file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\UR.npy"
# 检查文件是否存在
if os.path.isfile(file_path):
print("UR.npy 文件存在")
# 加载文件
UR = np.load(file_path)
print("成功加载 UR.npy 文件")
else:
# 生成滤波矩阵UR
print("UR.npy 文件不存在")
UR = np.zeros((MI, NI), dtype=int)
for i in range(MI):
for j in range(NI):
condition1 = (k_X_Rectangular[i] ** 2 / (k0 * np.sin(thetax_sure)) ** 2 +
k_Y_Rectangular[j] ** 2 / k0 ** 2) < 1.0
condition2 = (k_X_Rectangular[i] ** 2 / k0 ** 2 +
k_Y_Rectangular[j] ** 2 / (k0 * np.sin(thetay_sure)) ** 2) < 1.0
if condition1 and condition2:
UR[i, j] = 1

# 保存文件
np.save(file_path, UR)
print("已创建并保存UR.npy数据")

# 计算 fy1
fy1 = np.fft.fftshift(np.fft.ifft2(Ey2, s=(MI, NI))) * MI * NI * dx * dy
# 计算 fy_Magnitude
fy_Magnitude = 20 * np.log10(np.abs(fy1))
# 计算 fy_H_Magnitude
fy_H_Magnitude = 20 * np.log10(np.abs(fy1 * UR))

# ================== 创建傅里叶掩码 ==================
# 获取网格尺寸
numrows, numcols = k_Y_Rectangular_Grid.shape # 假设k_Y_Rectangular_Grid已定义
num_fourier_masks = 2

isim = np.imag(kz) != 0
kz[isim] = -1 * kz[isim]

# 定义文件路径
file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\masks.npy"
# 检查文件是否存在
if os.path.isfile(file_path):
print("masks.npy 文件存在")
# 加载文件
masks = np.load(file_path)
print("成功加载 masks.npy 文件")
else:
print("masks.npy 文件不存在")
masks = np.zeros((numrows, numcols, num_fourier_masks), dtype=np.complex128)

# ================== 生成衍射掩码 ==================
for i in range(num_fourier_masks):
phase = -1j * kz * dz_values[i] * lamda
masks[:, :, i] = UR * np.exp(phase)

# 保存文件
np.save(file_path, masks)
print("已创建并保存masks.npy数据")

# ================== 验证数据计算 ==================
# 傅里叶变换和预处理
fy2 = np.fft.fftshift(np.fft.ifft2(Ey2, s=(MI, NI))) * MI * NI * dx * dy
x = fy2.flatten() # 展平为一维向量 .ravel(order='F') .flatten()

# b_cal = np.abs(A_function(x, masks, padding, dx)) # 矩阵乘法

# 定义文件路径
file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\A.npy"
# 检查文件是否存在
if os.path.isfile(file_path):
print("A.npy 文件存在")
# 加载文件
A = np.load(file_path)
print("成功加载 A.npy 文件")
else:
print("A.npy 文件不存在")
A = construct_A_matrix(masks, padding, dx)
# 保存文件
np.save(file_path, A)
print("已创建并保存A.npy数据")

b_cal = np.abs(A @ x)

b_real = np.concatenate([np.abs(Ey2_1D.ravel(order='F')), np.abs(Ey3_1D.ravel(order='F'))])

# 误差计算
b_err_1 = np.sqrt(np.sum((np.abs(b_real) - b_cal) ** 2) / np.sum(b_cal ** 2))
b_err_2 = np.abs((b_real - b_cal) / b_cal)

# 重塑误差矩阵
b_err = b_err_2.reshape((numrows // padding, numcols // padding, num_fourier_masks))
E1_err = np.squeeze(b_err[:, :, 0])
E2_err = np.squeeze(b_err[:, :, 1])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
if 'trained_test' in process:
# 检查是否有GPU可用
device = torch.device('cuda' if (torch.cuda.is_available() and FLAG_GPU) else 'cpu')

model_path = r"E:\PhysenNet_inversion\DATA\process_DATA\complex_signal_model.pth"

if os.path.exists(model_path):
print("发现训练模型,加载模型中...")
model = torch.load(model_path).to(device)

# 生成测试数据
A, y, x = generate_data(n, m)

file_path = r"E:\PhysenNet_inversion\DATA\process_DATA\test_A.npy"
if os.path.isfile(file_path):
print("test_A.npy 文件存在")
# 加载文件
A = np.load(file_path)
print("成功加载 test_A.npy 文件")
else:
pass

# 将A拆分为实部和虚部,并展平
A_real = np.real(A).ravel(order='F')
A_imag = np.imag(A).ravel(order='F')
y_flat = y.flatten()

# 转换为PyTorch张量
A_real_tensor = torch.FloatTensor(A_real).to(device)
A_imag_tensor = torch.FloatTensor(A_imag).to(device)
y_tensor = torch.FloatTensor(y_flat).to(device)

# 测试模型
model.eval()
with torch.no_grad():
x_real_pred, x_imag_pred = model(A_real_tensor, A_imag_tensor, y_tensor)
z_hat = x_real_pred.cpu().numpy() + 1j * x_imag_pred.cpu().numpy()

# 计算恢复误差
error = np.linalg.norm(z_hat - x) / np.linalg.norm(x)
error = mean_squared_error(np.real(z_hat), np.real(x)) + mean_squared_error(np.imag(z_hat), np.imag(x))
print(f"Recovery error: {error}")

频谱域滤波的作用

Spectral Domain Filtering(频谱域滤波) 是一种在频域中对信号进行处理的技术,通过傅里叶变换将信号从时域/空域转换到频域,然后针对特定频率成分进行增强或抑制。常见的操作包括低通滤波(保留低频)、高通滤波(保留高频)或带通滤波(保留特定频段)。

主要作用

  • 去噪与平滑:抑制高频噪声(如传感器噪声或干扰)。
  • 特征提取:增强信号中的关键频率成分(如边缘检测中的高频信号)。
  • 带宽约束:限制信号的频率范围以符合物理系统的实际带宽(如光学系统的衍射极限)。
  • 先验知识嵌入:在迭代优化中引入物理约束(如支持域、非负性等)。

在电磁场相位恢复中的必要性

相位恢复的目标是从仅有的强度测量(如光强或电场模值)中重建丢失的相位信息(如相干衍射成像、全息术)。其核心挑战在于该问题是病态(ill-posed)的,即解不唯一或不稳定。频谱域滤波在此过程中至关重要,原因如下:

抑制噪声与伪影

  • 实际测量中噪声通常存在于高频部分(如散粒噪声、热噪声)。
  • 迭代恢复过程中,噪声会被反复放大,导致解偏离真实值。
  • 频谱域滤波(如低通滤波)可有效抑制高频噪声,提升重建质量。

施加物理约束

  • 电磁场传播受限于物理系统带宽(如光学系统的数值孔径限制)。
  • 超出系统截止频率的成分可能是非物理的,需通过滤波去除(如带通滤波匹配系统传递函数)。

稳定迭代算法

  • 相位恢复算法(如Gerchberg-Saxton、Fienup算法)需在空域和频域交替迭代,施加约束。
  • 频谱域滤波作为频域约束的一部分,可防止算法发散,加速收敛。

解决病态性问题

  • 相位恢复的数学本质是求解非线性方程,需额外约束(如稀疏性、平滑性)。
  • 频谱域滤波通过限制解的频域特性,缩小解空间,使问题适定(well-posed)。

典型应用示例

在相干衍射成像(CDI)中:

  1. 实验测得衍射图样(频域强度)。

  2. 通过傅里叶变换迭代更新相位,每次迭代中:

    频域滤波:限制频率范围与系统带宽一致,去除超出截止频率的成分。

  3. 未滤波的迭代可能导致高频噪声主导,最终重建失败。

在相位恢复算法中频谱域滤波的作用

在相位恢复算法(如 Gerchberg-Saxton(GS)算法Fienup算法)中,“空域和频域交替迭代,施加约束”是核心操作。这一过程通过反复在两个域(空域和频域)之间传递信号,并施加不同的物理约束,逐步逼近真实的相位解。

角域(角谱)的基本概念:

角域,通常称为角谱,是指通过傅里叶变换将空间分布的电场或光场分解为不同传播方向的平面波的集合。每个平面波的传播方向由对应的空间频率决定,这些方向的角度分布即构成角谱。

平面波的波矢 $k=(k_x,k_y,k_z)$决定了波的传播方向。根据几何关系,传播方向的角度 $θ$(例如相对于z轴的倾斜角)满足:

$$\sin\theta = \frac{\sqrt{k_x^2 + k_y^2}}{k} \quad$$​

因此,空间频率 (kx,ky)直接对应传播角度。傅里叶变换后的每个点 (kx,ky) 对应一个特定方向的平面波分量,其幅度和相位由傅里叶系数给出。

通过傅里叶变换,复杂的波场被分解为无数个不同方向的平面波叠加。角谱即表示这些平面波的权重分布,反映了波场中各个传播方向的重要性。

角域的物理意义

  • 平面波分解
    任何复杂的波(如通过狭缝的光波)都可视为不同方向传播的平面波的叠加。角谱描述了这些平面波的组成,例如:

    • 一束平行光对应角谱中单一方向的峰值;
    • 经过小孔衍射的光波,角谱会扩展,显示多方向的传播分量。
  • 传播特性分析
    角谱可以解释波的传播行为。例如:

    • 远场(夫琅禾费衍射):远场衍射图样直接对应角谱的强度分布,即不同方向的光强;
    • 近场(菲涅尔衍射):角谱通过 相位修正 仍能描述波的传播演化。
  • 实际应用

    • 光学成像:透镜的聚焦效果可通过角谱中 高频成分(大角度传播的波)的截断来解释(如数值孔径限制);
    • 无线通信:天线辐射方向图的分析本质上是角谱的体现。

示例说明

假设一束光垂直照射一个狭缝:

  • 空间域:狭缝处的电场分布$ E(x) $是矩形函数;
  • 傅里叶变换:得到频域的$ sinc $函数$\tilde{E}(k_x)$,对应角谱;
  • 物理意义:$sinc$ 函数的零点间隔表明哪些传播角度(空间频率)被抑制,对应衍射图样的暗条纹。

关键公式

$$ \text{角谱} \propto \tilde{E}(k_x, k_y), \quad \text{传播角度} \propto \arctan\left(\frac{\sqrt{k_x^2 + k_y^2}}{k_z}\right).$$