forked from Bick95/PPO
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinput_net_modules.py
More file actions
200 lines (155 loc) · 8.12 KB
/
Copy pathinput_net_modules.py
File metadata and controls
200 lines (155 loc) · 8.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import torch
import torch.nn as nn
import torch.nn.functional as F
from utils_input_net_modules import extract_params_from_structure, size_preserving_padding, out_dim
class InCNN(nn.Module):
def __init__(self,
input_sample: torch.tensor,
nonlinearity: torch.nn.functional = F.relu,
network_structure: list = None,
):
super(InCNN, self).__init__()
# Determine dimensions of observation (input) sample
data_height = input_sample.shape[0]
data_width = input_sample.shape[1]
# The number of input channels = nr of color channels times nr of stacked environmental states used to get one Markov state:
if len(input_sample.shape) == 3:
# There is a color/Markov-channel-dimension
in_channels = input_sample.shape[2]
else:
# There is no third dimension along which different color channels can be indexed
in_channels = 1
print("In InCNN model:")
print("Shape input sample:", input_sample.shape)
print("Input sample - Height: %i, Width: %i, Color/Markov channels: %i" % (data_height, data_width, in_channels))
# Default setup of network
if network_structure is None:
network_structure = [
# Dicts for conv layers
{
'out_channels': 32, # 16 output channels = 16 filters
'kernel_size': 4, # 8x8 kernel/filter size
'stride': 1
},
{
'out_channels': 16,
'kernel_size': 8,
'stride': 2
},
# Nr. of nodes for fully connected layers
256,
64,
]
# Set up processing pipeline (i.e. policy)
self.pipeline = []
for i, layer_specs in enumerate(network_structure):
if isinstance(layer_specs, dict):
# Determine padding parameter
if isinstance(layer_specs['padding'], int):
# Padding parameter is provided as int
padding = (layer_specs['padding'], layer_specs['padding'])
elif isinstance(layer_specs['padding'], tuple):
# Padding parameter is provided as tuple
padding = layer_specs['padding']
elif isinstance(layer_specs['padding'], str) and (
'preserve' in layer_specs['padding'].lower() or
'auto' in layer_specs['padding'].lower()
):
# Padding is supposed to preserve input's size after convolution
padding_vertical = size_preserving_padding(i=data_height, k=layer_specs['kernel_size'], s=layer_specs['stride'])
padding_horizontal = size_preserving_padding(i=data_width, k=layer_specs['kernel_size'], s=layer_specs['stride'])
padding = (padding_vertical, padding_horizontal)
else:
# No padding desired
padding = (0, 0)
# Add conv layer
self.pipeline.append(
nn.Conv2d(in_channels=in_channels,
out_channels=layer_specs['out_channels'] if 'out_channels' in layer_specs.keys() else 32,
kernel_size=layer_specs['kernel_size'] if 'kernel_size' in layer_specs.keys() else 4,
stride=layer_specs['stride'] if 'stride' in layer_specs.keys() else 1,
padding=padding,
dilation=layer_specs['dilation'] if 'dilation' in layer_specs.keys() else 1
)
)
# Compute dimensions of output of newly added Conv2d layer
data_height = out_dim(i=data_height,
p=padding[0],
d=extract_params_from_structure(structure=network_structure, index=i, key='dilation', vertical_dim=True, default=1),
k=extract_params_from_structure(structure=network_structure, index=i, key='kernel_size', vertical_dim=True, default=4),
s=extract_params_from_structure(structure=network_structure, index=i, key='stride', vertical_dim=True, default=1))
data_width = out_dim(i=data_width,
p=padding[1],
d=extract_params_from_structure(structure=network_structure, index=i, key='dilation', vertical_dim=False, default=1),
k=extract_params_from_structure(structure=network_structure, index=i, key='kernel_size', vertical_dim=False, default=4),
s=extract_params_from_structure(structure=network_structure, index=i, key='stride', vertical_dim=False, default=1))
# Prepare for next iteration: this layer's nr of output channels/filters is equal to nr of next Conv2d layer's input channels
in_channels = layer_specs['out_channels']
elif isinstance(layer_specs, int) and isinstance(network_structure[i-1], dict):
# Before adding a fully connected (FC) layer, add flattening first and then the FC layer
# Compute layer's input size
flattened_size = data_height * data_width * network_structure[i-1]['out_channels']
# Add flattening layer
self.pipeline.append(
nn.Flatten(start_dim=1)
)
# Add actual FC layer
self.pipeline.append(
nn.Linear(flattened_size, network_structure[i])
)
else:
# Add fully connected layer
self.pipeline.append(
nn.Linear(network_structure[i-1], network_structure[i])
)
# Register all layers
for i, layer in enumerate(self.pipeline):
self.add_module("in_cnn_layer_" + str(i), layer)
self.nonlinearity = nonlinearity
def forward(self, x):
# Change dimensionality from (Batch, Height, Width, Color) to (Batch, Color, Height, Width)
x = x.permute(0, 3, 1, 2)
for layer in self.pipeline:
x = self.nonlinearity(layer(x))
return x
class InMLP(nn.Module):
def __init__(self,
input_sample: torch.tensor,
nonlinearity: torch.nn.functional = F.relu,
network_structure: list = None,
):
super(InMLP, self).__init__()
# Compute nr of input features for given gym env for a single batch-example (Assumption: no flattening needed!)
input_features = input_sample.shape[-1]
print("In InMLP:")
print("input_features:", input_features)
# Construct NN-processing pipeline consisting of concatenation of layers to be applied to any input
self.pipeline = [
# Add input layer
nn.Linear(
input_features,
network_structure[0] if isinstance(network_structure, list) else network_structure
)
]
# Add optional hidden layers (in accordance with specification in network_architecture)
if isinstance(network_structure, list):
for i in range(1, len(network_structure)):
self.pipeline.append(
nn.Linear(network_structure[i-1], network_structure[i])
)
# Register all layers
for i, layer in enumerate(self.pipeline):
self.add_module("in_mlp_layer_" + str(i), layer)
self.nonlinearity = nonlinearity
def forward(self, x):
# Forward-pass through the network
for layer in self.pipeline:
x = self.nonlinearity(layer(x))
return x
# Testing
#import gym
#env = gym.make('CartPole-v1')
#observation = env.observation_space.sample()
#observation_shape = torch.tensor(observation.shape)
#net = MLP_Module(observation_shape[0])
#net(torch.tensor(observation))