Skip to content

Reference

BestParameterFinder

Source code in code\bpf.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class BestParameterFinder:
	def __init__(self, metric: Optional[Callable[["BestParameterFinder", np.ndarray], float]] = None):
		"""
		Initializes the BestParameterFinder.

		Args:
			metric (Optional[Callable[[BestParameterFinder, np.ndarray], float]]):
				A custom metric function. Defaults to `expWithStd`.
		"""
		self.metric = metric or self.expWithStd
		self.p_g: Optional[float] = None
		self.c: Optional[float] = None

	def nInfUniform(self, voltages: np.ndarray) -> float:
		"""
		Computes the infinity-norm distance between voltages and a uniform distribution.

		Args:
			voltages (np.ndarray): Array of voltage values.

		Returns:
			float: Infinity-norm distance.
		"""
		voltages.sort()
		uniform = np.array([x / (len(voltages) - 1) for x in range(len(voltages))])
		return np.linalg.norm(abs(voltages - uniform))

	def nInfExp(self, voltages: np.ndarray, base: float = 10) -> float:
		"""
		Computes the infinity-norm distance between voltages and an exponential distribution.

		Args:
			voltages (np.ndarray): Array of voltage values.
			base (float): Base of the exponential function. Defaults to 10.

		Returns:
			float: Infinity-norm distance.
		"""
		global dist
		voltages.sort()
		if len(dist) != len(voltages):
			dist = np.array([np.power(base, (x / (len(voltages) - 1)) - 1) for x in range(len(voltages))])
		return np.linalg.norm(abs(voltages - dist))

	def median(self, voltages: np.ndarray, value: float = 0.5) -> float:
		"""
		Computes the absolute difference between the median voltage and a given value.

		Args:
			voltages (np.ndarray): Array of voltage values.
			value (float): Value to compare the median to. Defaults to 0.5.

		Returns:
			float: Absolute difference from the median.
		"""
		voltages.sort()
		return abs(voltages[int(len(voltages) / 2)] - value)

	def minimum(self, voltages: np.ndarray, value: float = 0.1) -> float:
		"""
		Computes the absolute difference between the minimum voltage and a given value.

		Args:
			voltages (np.ndarray): Array of voltage values.
			value (float): Value to compare the minimum to. Defaults to 0.1.

		Returns:
			float: Absolute difference from the minimum.
		"""
		voltages.sort()
		return abs(voltages[0] - value)

	def minWithStd(self, voltages: np.ndarray, value: float = 0.1) -> float:
		"""
		Computes the normalized difference between the minimum voltage and a given value.

		Args:
			voltages (np.ndarray): Array of voltage values.
			value (float): Value to compare the minimum to. Defaults to 0.1.

		Returns:
			float: Normalized absolute difference using standard deviation.
		"""
		voltages.sort()
		return abs(voltages[0] - value) / np.std(voltages)

	def expWithStd(self, voltages: np.ndarray, base: float = 10) -> float:
		"""
		Computes the normalized exponential distance.

		Args:
			voltages (np.ndarray): Array of voltage values.
			base (float): Base of the exponential. Defaults to 10.

		Returns:
			float: Normalized exponential distance.
		"""
		return self.nInfExp(voltages, base) / np.std(voltages)

	def setResistanceToGround(self, p_g: float) -> None:
		"""
		Sets the resistance to ground parameter.

		Args:
			p_g (float): Resistance to ground value (logarithmic scale will be used).
		"""
		self.p_g = np.log(p_g)

	def setKernelParameter(self, c: float) -> None:
		"""
		Sets the kernel parameter.

		Args:
			c (float): Kernel parameter (logarithmic scale will be used).
		"""
		self.c = np.log(c)

	def calculateFor(
		self,
		landmarks: List,
		data: Union[create_data.Data, kmeans.Partitions],
		c: float,
		p_g: float,
		approx: bool = False,
		approx_epsilon: Optional[float] = None,
		approx_iters: Optional[int] = None
	) -> Union[float, tuple[np.ndarray, voltage.Problem]]:
		"""
		Calculates voltages and applies the metric.

		Args:
			landmarks (List): Landmarks to add to the problem.
			data (Union[create_data.Data, kmeans.Partitions]): Input data.
			c (float): Kernel parameter (log space).
			p_g (float): Resistance to ground (log space).
			approx (bool): Whether to use approximation. Defaults to False.
			approx_epsilon (Optional[float]): Epsilon value for approximation.
			approx_iters (Optional[int]): Number of approximation iterations.

		Returns:
			Union[float, tuple[np.ndarray, voltage.Problem]]: Metric value or voltages and problem.
		"""

		if isinstance(data, create_data.Data):
			meanProblem = voltage.Problem(data)
			meanProblem.timeStart()
			meanProblem.setKernel(meanProblem.gaussiankernel)
			meanProblem.setWeights(np.exp(c))

		elif isinstance(data, kmeans.Partitions):
			partitions = data
			meanProblem = voltage.Problem(partitions.centers)
			meanProblem.timeStart()
			meanProblem.setKernel(meanProblem.gaussiankernel)
			meanProblem.setPartitionWeights(partitions, np.exp(c))

		else:
			raise ValueError("Unsupported data type")

		meanProblem.addUniversalGround(np.exp(p_g))
		meanProblem.addLandmarks(landmarks)

		meanProblem.timeEnd()

		if approx:
			voltages = np.array(voltage.Solver(meanProblem).approximate_voltages(approx_epsilon, approx_iters))
		else:
			voltages = np.array(voltage.Solver(meanProblem).compute_voltages())

		meanProblem.timeEnd()

		if self.metric:
			return self.metric(voltages)
		else:
			return voltages, meanProblem

	def bestParameterFinder(
		self,
		landmarks: List,
		data: Union[create_data.Data, kmeans.Partitions],
		minBound: float = -25,
		maxBound: float = -1,
		granularity: int = 5,
		epsilon: float = 1,
		approx: Optional[int] = None
	) -> tuple[float, float]:
		"""
		Finds optimal (C, P_G) parameters minimizing the metric.

		Args:
			landmarks (List): Landmarks to use in solving.
			data (Union[create_data.Data, kmeans.Partitions]): Input dataset.
			minBound (float): Minimum log-bound for search. Defaults to -25.
			maxBound (float): Maximum log-bound for search. Defaults to -1.
			granularity (int): Granularity of grid search. Defaults to 5.
			epsilon (float): Precision threshold. Defaults to 1.
			approx (Optional[int]): Approximation iteration count. Defaults to None.

		Returns:
			tuple[float, float]: Best (C, P_G) parameters (in real scale).
		"""
		window_size = (maxBound - minBound) / 2
		bestc = minBound + window_size
		bestg = minBound + window_size
		val = float('inf')

		while window_size > epsilon:
			cs = [bestc + x * window_size / granularity for x in range(-granularity + 1, granularity)]
			gs = [bestg + x * window_size / granularity for x in range(-granularity + 1, granularity)]

			if self.c is not None:
				cs = [self.c]
			if self.p_g is not None:
				gs = [self.p_g]

			for c in cs:
				for g in gs:
					try:
						if approx is None:
							tempval = self.calculateFor(landmarks, data, c, g)
						else:
							tempval = self.calculateFor(landmarks, data, c, g, approx=True, approx_iters=approx)

						if val > tempval:
							bestc, bestg = c, g
							val = tempval
					except ValueError:
						pass

			window_size /= granularity

		return np.exp(bestc), np.exp(bestg)

	def visualizations(self, voltages: List[np.ndarray], fileStarter: str) -> None:
		"""
		Generates and saves PCA and MDS visualizations of the voltage data.

		Args:
			voltages (List[np.ndarray]): List of voltage arrays.
			fileStarter (str): File name prefix for saving plots.

		Returns:
			None
		"""

		points = np.array(list(map(list, zip(*voltages))))

		pca = PCA(n_components=2)
		points_2d = pca.fit_transform(points)

		plt.scatter(points_2d[:, 0], points_2d[:, 1], s=10)
		plt.xlabel("PCA Component 1")
		plt.ylabel("PCA Component 2")
		plt.title("PCA Projection of Solver Outputs")
		plt.savefig(fileStarter + "_PCA.png")
		plt.clf()

		mds = MDS(n_components=2, random_state=42)
		transformed_points = mds.fit_transform(points)

		plt.figure(figsize=(8, 6))
		plt.scatter(transformed_points[:, 0], transformed_points[:, 1], c='blue', edgecolors='black')
		plt.xlabel("MDS Dimension 1")
		plt.ylabel("MDS Dimension 2")
		plt.title("Multidimensional Scaling (MDS) to 2D")
		plt.savefig(fileStarter + "_MDS.png")
		plt.clf()

__init__(metric=None)

Initializes the BestParameterFinder.

Parameters:

Name Type Description Default
metric Optional[Callable[[BestParameterFinder, ndarray], float]]

A custom metric function. Defaults to expWithStd.

None
Source code in code\bpf.py
15
16
17
18
19
20
21
22
23
24
25
def __init__(self, metric: Optional[Callable[["BestParameterFinder", np.ndarray], float]] = None):
	"""
	Initializes the BestParameterFinder.

	Args:
		metric (Optional[Callable[[BestParameterFinder, np.ndarray], float]]):
			A custom metric function. Defaults to `expWithStd`.
	"""
	self.metric = metric or self.expWithStd
	self.p_g: Optional[float] = None
	self.c: Optional[float] = None

bestParameterFinder(landmarks, data, minBound=-25, maxBound=-1, granularity=5, epsilon=1, approx=None)

Finds optimal (C, P_G) parameters minimizing the metric.

Parameters:

Name Type Description Default
landmarks List

Landmarks to use in solving.

required
data Union[Data, Partitions]

Input dataset.

required
minBound float

Minimum log-bound for search. Defaults to -25.

-25
maxBound float

Maximum log-bound for search. Defaults to -1.

-1
granularity int

Granularity of grid search. Defaults to 5.

5
epsilon float

Precision threshold. Defaults to 1.

1
approx Optional[int]

Approximation iteration count. Defaults to None.

None

Returns:

Type Description
tuple[float, float]

tuple[float, float]: Best (C, P_G) parameters (in real scale).

Source code in code\bpf.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def bestParameterFinder(
	self,
	landmarks: List,
	data: Union[create_data.Data, kmeans.Partitions],
	minBound: float = -25,
	maxBound: float = -1,
	granularity: int = 5,
	epsilon: float = 1,
	approx: Optional[int] = None
) -> tuple[float, float]:
	"""
	Finds optimal (C, P_G) parameters minimizing the metric.

	Args:
		landmarks (List): Landmarks to use in solving.
		data (Union[create_data.Data, kmeans.Partitions]): Input dataset.
		minBound (float): Minimum log-bound for search. Defaults to -25.
		maxBound (float): Maximum log-bound for search. Defaults to -1.
		granularity (int): Granularity of grid search. Defaults to 5.
		epsilon (float): Precision threshold. Defaults to 1.
		approx (Optional[int]): Approximation iteration count. Defaults to None.

	Returns:
		tuple[float, float]: Best (C, P_G) parameters (in real scale).
	"""
	window_size = (maxBound - minBound) / 2
	bestc = minBound + window_size
	bestg = minBound + window_size
	val = float('inf')

	while window_size > epsilon:
		cs = [bestc + x * window_size / granularity for x in range(-granularity + 1, granularity)]
		gs = [bestg + x * window_size / granularity for x in range(-granularity + 1, granularity)]

		if self.c is not None:
			cs = [self.c]
		if self.p_g is not None:
			gs = [self.p_g]

		for c in cs:
			for g in gs:
				try:
					if approx is None:
						tempval = self.calculateFor(landmarks, data, c, g)
					else:
						tempval = self.calculateFor(landmarks, data, c, g, approx=True, approx_iters=approx)

					if val > tempval:
						bestc, bestg = c, g
						val = tempval
				except ValueError:
					pass

		window_size /= granularity

	return np.exp(bestc), np.exp(bestg)

calculateFor(landmarks, data, c, p_g, approx=False, approx_epsilon=None, approx_iters=None)

Calculates voltages and applies the metric.

Parameters:

Name Type Description Default
landmarks List

Landmarks to add to the problem.

required
data Union[Data, Partitions]

Input data.

required
c float

Kernel parameter (log space).

required
p_g float

Resistance to ground (log space).

required
approx bool

Whether to use approximation. Defaults to False.

False
approx_epsilon Optional[float]

Epsilon value for approximation.

None
approx_iters Optional[int]

Number of approximation iterations.

None

Returns:

Type Description
Union[float, tuple[ndarray, Problem]]

Union[float, tuple[np.ndarray, voltage.Problem]]: Metric value or voltages and problem.

Source code in code\bpf.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def calculateFor(
	self,
	landmarks: List,
	data: Union[create_data.Data, kmeans.Partitions],
	c: float,
	p_g: float,
	approx: bool = False,
	approx_epsilon: Optional[float] = None,
	approx_iters: Optional[int] = None
) -> Union[float, tuple[np.ndarray, voltage.Problem]]:
	"""
	Calculates voltages and applies the metric.

	Args:
		landmarks (List): Landmarks to add to the problem.
		data (Union[create_data.Data, kmeans.Partitions]): Input data.
		c (float): Kernel parameter (log space).
		p_g (float): Resistance to ground (log space).
		approx (bool): Whether to use approximation. Defaults to False.
		approx_epsilon (Optional[float]): Epsilon value for approximation.
		approx_iters (Optional[int]): Number of approximation iterations.

	Returns:
		Union[float, tuple[np.ndarray, voltage.Problem]]: Metric value or voltages and problem.
	"""

	if isinstance(data, create_data.Data):
		meanProblem = voltage.Problem(data)
		meanProblem.timeStart()
		meanProblem.setKernel(meanProblem.gaussiankernel)
		meanProblem.setWeights(np.exp(c))

	elif isinstance(data, kmeans.Partitions):
		partitions = data
		meanProblem = voltage.Problem(partitions.centers)
		meanProblem.timeStart()
		meanProblem.setKernel(meanProblem.gaussiankernel)
		meanProblem.setPartitionWeights(partitions, np.exp(c))

	else:
		raise ValueError("Unsupported data type")

	meanProblem.addUniversalGround(np.exp(p_g))
	meanProblem.addLandmarks(landmarks)

	meanProblem.timeEnd()

	if approx:
		voltages = np.array(voltage.Solver(meanProblem).approximate_voltages(approx_epsilon, approx_iters))
	else:
		voltages = np.array(voltage.Solver(meanProblem).compute_voltages())

	meanProblem.timeEnd()

	if self.metric:
		return self.metric(voltages)
	else:
		return voltages, meanProblem

expWithStd(voltages, base=10)

Computes the normalized exponential distance.

Parameters:

Name Type Description Default
voltages ndarray

Array of voltage values.

required
base float

Base of the exponential. Defaults to 10.

10

Returns:

Name Type Description
float float

Normalized exponential distance.

Source code in code\bpf.py
100
101
102
103
104
105
106
107
108
109
110
111
def expWithStd(self, voltages: np.ndarray, base: float = 10) -> float:
	"""
	Computes the normalized exponential distance.

	Args:
		voltages (np.ndarray): Array of voltage values.
		base (float): Base of the exponential. Defaults to 10.

	Returns:
		float: Normalized exponential distance.
	"""
	return self.nInfExp(voltages, base) / np.std(voltages)

median(voltages, value=0.5)

Computes the absolute difference between the median voltage and a given value.

Parameters:

Name Type Description Default
voltages ndarray

Array of voltage values.

required
value float

Value to compare the median to. Defaults to 0.5.

0.5

Returns:

Name Type Description
float float

Absolute difference from the median.

Source code in code\bpf.py
58
59
60
61
62
63
64
65
66
67
68
69
70
def median(self, voltages: np.ndarray, value: float = 0.5) -> float:
	"""
	Computes the absolute difference between the median voltage and a given value.

	Args:
		voltages (np.ndarray): Array of voltage values.
		value (float): Value to compare the median to. Defaults to 0.5.

	Returns:
		float: Absolute difference from the median.
	"""
	voltages.sort()
	return abs(voltages[int(len(voltages) / 2)] - value)

minWithStd(voltages, value=0.1)

Computes the normalized difference between the minimum voltage and a given value.

Parameters:

Name Type Description Default
voltages ndarray

Array of voltage values.

required
value float

Value to compare the minimum to. Defaults to 0.1.

0.1

Returns:

Name Type Description
float float

Normalized absolute difference using standard deviation.

Source code in code\bpf.py
86
87
88
89
90
91
92
93
94
95
96
97
98
def minWithStd(self, voltages: np.ndarray, value: float = 0.1) -> float:
	"""
	Computes the normalized difference between the minimum voltage and a given value.

	Args:
		voltages (np.ndarray): Array of voltage values.
		value (float): Value to compare the minimum to. Defaults to 0.1.

	Returns:
		float: Normalized absolute difference using standard deviation.
	"""
	voltages.sort()
	return abs(voltages[0] - value) / np.std(voltages)

minimum(voltages, value=0.1)

Computes the absolute difference between the minimum voltage and a given value.

Parameters:

Name Type Description Default
voltages ndarray

Array of voltage values.

required
value float

Value to compare the minimum to. Defaults to 0.1.

0.1

Returns:

Name Type Description
float float

Absolute difference from the minimum.

Source code in code\bpf.py
72
73
74
75
76
77
78
79
80
81
82
83
84
def minimum(self, voltages: np.ndarray, value: float = 0.1) -> float:
	"""
	Computes the absolute difference between the minimum voltage and a given value.

	Args:
		voltages (np.ndarray): Array of voltage values.
		value (float): Value to compare the minimum to. Defaults to 0.1.

	Returns:
		float: Absolute difference from the minimum.
	"""
	voltages.sort()
	return abs(voltages[0] - value)

nInfExp(voltages, base=10)

Computes the infinity-norm distance between voltages and an exponential distribution.

Parameters:

Name Type Description Default
voltages ndarray

Array of voltage values.

required
base float

Base of the exponential function. Defaults to 10.

10

Returns:

Name Type Description
float float

Infinity-norm distance.

Source code in code\bpf.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def nInfExp(self, voltages: np.ndarray, base: float = 10) -> float:
	"""
	Computes the infinity-norm distance between voltages and an exponential distribution.

	Args:
		voltages (np.ndarray): Array of voltage values.
		base (float): Base of the exponential function. Defaults to 10.

	Returns:
		float: Infinity-norm distance.
	"""
	global dist
	voltages.sort()
	if len(dist) != len(voltages):
		dist = np.array([np.power(base, (x / (len(voltages) - 1)) - 1) for x in range(len(voltages))])
	return np.linalg.norm(abs(voltages - dist))

nInfUniform(voltages)

Computes the infinity-norm distance between voltages and a uniform distribution.

Parameters:

Name Type Description Default
voltages ndarray

Array of voltage values.

required

Returns:

Name Type Description
float float

Infinity-norm distance.

Source code in code\bpf.py
27
28
29
30
31
32
33
34
35
36
37
38
39
def nInfUniform(self, voltages: np.ndarray) -> float:
	"""
	Computes the infinity-norm distance between voltages and a uniform distribution.

	Args:
		voltages (np.ndarray): Array of voltage values.

	Returns:
		float: Infinity-norm distance.
	"""
	voltages.sort()
	uniform = np.array([x / (len(voltages) - 1) for x in range(len(voltages))])
	return np.linalg.norm(abs(voltages - uniform))

setKernelParameter(c)

Sets the kernel parameter.

Parameters:

Name Type Description Default
c float

Kernel parameter (logarithmic scale will be used).

required
Source code in code\bpf.py
122
123
124
125
126
127
128
129
def setKernelParameter(self, c: float) -> None:
	"""
	Sets the kernel parameter.

	Args:
		c (float): Kernel parameter (logarithmic scale will be used).
	"""
	self.c = np.log(c)

setResistanceToGround(p_g)

Sets the resistance to ground parameter.

Parameters:

Name Type Description Default
p_g float

Resistance to ground value (logarithmic scale will be used).

required
Source code in code\bpf.py
113
114
115
116
117
118
119
120
def setResistanceToGround(self, p_g: float) -> None:
	"""
	Sets the resistance to ground parameter.

	Args:
		p_g (float): Resistance to ground value (logarithmic scale will be used).
	"""
	self.p_g = np.log(p_g)

visualizations(voltages, fileStarter)

Generates and saves PCA and MDS visualizations of the voltage data.

Parameters:

Name Type Description Default
voltages List[ndarray]

List of voltage arrays.

required
fileStarter str

File name prefix for saving plots.

required

Returns:

Type Description
None

None

Source code in code\bpf.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def visualizations(self, voltages: List[np.ndarray], fileStarter: str) -> None:
	"""
	Generates and saves PCA and MDS visualizations of the voltage data.

	Args:
		voltages (List[np.ndarray]): List of voltage arrays.
		fileStarter (str): File name prefix for saving plots.

	Returns:
		None
	"""

	points = np.array(list(map(list, zip(*voltages))))

	pca = PCA(n_components=2)
	points_2d = pca.fit_transform(points)

	plt.scatter(points_2d[:, 0], points_2d[:, 1], s=10)
	plt.xlabel("PCA Component 1")
	plt.ylabel("PCA Component 2")
	plt.title("PCA Projection of Solver Outputs")
	plt.savefig(fileStarter + "_PCA.png")
	plt.clf()

	mds = MDS(n_components=2, random_state=42)
	transformed_points = mds.fit_transform(points)

	plt.figure(figsize=(8, 6))
	plt.scatter(transformed_points[:, 0], transformed_points[:, 1], c='blue', edgecolors='black')
	plt.xlabel("MDS Dimension 1")
	plt.ylabel("MDS Dimension 2")
	plt.title("Multidimensional Scaling (MDS) to 2D")
	plt.savefig(fileStarter + "_MDS.png")
	plt.clf()

Data

Class for handling and processing data sets.

Source code in code\create_data.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
class Data():
	"""Class for handling and processing data sets."""
	def __init__(self, arg=None, stream=False):
		"""
		Initializes the Data object from a list, file path, or raw data.

		Args:
			arg (Union[list, str, Any]): The input data or path to data file.
			stream (bool): Whether to use streaming mode for large files.
		"""
		self.stream = stream

		if isinstance(arg, list):
			self.data = np.array(arg)
			self.length = len(self.data)
		elif isinstance(arg, str):
			if (stream):
				self.data = self.stream_data_json(arg)
				self.length = next(self.data)
				self.i = 0
			else:
				self.load_data(arg)
				self.length = len(self.data)

			self.input_file = arg
		else:
			self.data = arg
			self.length = len(self.data)

	def __len__(self):
		"""
		Returns the length of the dataset.

		Returns:
			int: The number of data points.
		"""
		return self.length

	def __getitem__(self, index):
		"""
		Allows indexing into the dataset.

		Args:
			index (int): Index of the desired data point.

		Returns:
			np.ndarray: The data point at the given index.
		"""
		if (self.stream):
			if (index < self.i):
				self.data = self.stream_data_json(self.input_file)
				next(self.data)
				self.i = 0

			while (self.i <= index):
				value = next(self.data)
				self.i += 1

			return value
		else:
			return self.data[index]

	def __setitem__(self, index, value):
		"""
		Sets a value in the dataset at a specified index.

		Args:
			index (int): The index to modify.
			value (Any): The new value to set.
		"""
		self.data[index] = value

	def __iter__(self):
		"""
		Returns an iterator over the dataset for use in for-loops.

		Returns:
			Iterator: An iterator over the dataset.
		"""
		if (hasattr(self, 'input_file')):
			self.streaming_data = self.stream_data_json(self.input_file)
			next(self.streaming_data)
		else:
			self.streaming_data = 0

		return self

	def __next__(self):
		"""
		Retrieves the next data point in an iteration.

		Returns:
			np.ndarray: The next data point.

		Raises:
			StopIteration: If the end of the dataset is reached.
		"""
		try:
			if (hasattr(self, 'input_file')):
				return np.array(next(self.streaming_data))
			else:
				if (self.streaming_data == self.length):
					raise
				else:
					return np.array(self.data[self.streaming_data])

				self.streaming_data += 1
		except StopIteration:
			raise

	def getSubSet(self, indexList):
		"""
		Returns a subset of the data given a list of indices.

		Args:
			indexList (list[int]): List of indices to extract.

		Returns:
			Data: A new Data object containing the selected subset.
		"""
		subset = []
		for index in indexList:
			subset.append(self.data[index])
		return Data(subset)

	def save_data_json(self, output_file):
		"""
		Saves the dataset to a JSON file.

		Args:
			output_file (str): Path to the output file.
		"""
		fg = FileGenerator()
		fg.setGenerator(fg.linear_generator)
		fg.stream_save(output_file, self.data)

	def save_data_pickle(self, output_file):
		"""
		Saves the dataset to a pickle file.

		Args:
			output_file (str): Path to the output file.
		"""
		with open(output_file, 'wb') as f: 
			pickle.dump(self.data, f) 

	def load_data_json(self, input_file):
		"""
		Loads the dataset from a JSON file.

		Args:
			input_file (str): Path to the input file.

		Returns:
			list[np.ndarray]: The loaded data.
		"""
		with open(input_file, 'r') as f:
			self.input_file = input_file

			data = json.load(f)
			self.data = data["data"]
			self.length = data["length"]
			for i, point in enumerate(self.data):
				self.data[i] = np.array(point)

			return self.data

	def load_data_pickle(self, input_file):
		"""
		Loads the dataset from a pickle file.

		Args:
			input_file (str): Path to the input file.

		Returns:
			Any: The loaded data.
		"""
		with open(input_file, 'r') as f:
			self.input_file = input_file
			self.data = pickle.load(f)

			return self.data

	def stream_data_json(self, input_file):
		"""
		Streams data from a JSON file one entry at a time.

		Args:
			input_file (str): Path to the input JSON file.

		Yields:
			np.ndarray: A single data point from the dataset.
		"""
		with open(input_file, 'rb') as f:
			f.seek(0, 2)
			position = f.tell()

			value = ""
			read = False
			while position > 0:
				position -= 1
				f.seek(position)
				byte = f.read(1)

				if byte == b' ':
					# print(value)
					yield int(value)
					break

				if (read):
					value = byte.decode() + value

				if byte == b'}':
					read = True

		with open(input_file, 'r') as f:
			f.readline()

			for line in f:
				if ("length" in line):
					break

				data = json.loads(line.strip().split(']')[0] + ']')
				yield np.array(data)

	file_function_pairs = [["json", save_data_json, load_data_json], ["pkl", save_data_pickle, load_data_pickle]]

	def data_function(self, file, save_or_load):
		"""
		Routes file operation to appropriate function based on file extension.

		Args:
			file (str): File path.
			save_or_load (int): 1 for save, 2 for load.

		Returns:
			Optional[Any]: The result of the load operation if applicable.
		"""
		if (file == None):
			return

		for ffp in self.file_function_pairs:
			if file[-len(ffp[0]):] == ffp[0]:
				if save_or_load == 1:
					ffp[save_or_load](self.data, file)
				else:
					return ffp[save_or_load](self, file)

	def save_data(self, output_file):
		"""
		Saves the dataset to a file, choosing format by extension.

		Args:
			output_file (str): Path to the output file.

		Returns:
			Data: Self (for chaining).
		"""
		self.data_function(output_file, 1)
		return self

	def load_data(self, input_file):
		"""
		Loads the dataset from a file, choosing format by extension.

		Args:
			input_file (str): Path to the input file.

		Returns:
			Data: Self (for chaining).
		"""
		self.data_function(input_file, 2)
		return self

	def get_random_point(self):
		"""
		Returns a randomly selected point from the dataset.

		Returns:
			np.ndarray: A random data point.
		"""
		return select_random(self.data)

	def plot(self, name=None):
		"""
		Plots the dataset using matplotlib.

		Args:
			name (Optional[str]): File path to save the plot, if specified.
		"""
		Plotter().plotPoints(self.data, name)

	def getNumpy(self):
		"""
		Ensures that the dataset is returned as a NumPy array.

		Returns:
			np.ndarray: Dataset as a NumPy array.
		"""
		if isinstance(self.data, np.ndarray):
			# print(self.data.shape)
			return self.data
		else:
			temp = []
			for x in self.data:
				temp.append(np.array(x))

			# print(np.array(temp).shape)
			return np.array(temp)

__getitem__(index)

Allows indexing into the dataset.

Parameters:

Name Type Description Default
index int

Index of the desired data point.

required

Returns:

Type Description

np.ndarray: The data point at the given index.

Source code in code\create_data.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def __getitem__(self, index):
	"""
	Allows indexing into the dataset.

	Args:
		index (int): Index of the desired data point.

	Returns:
		np.ndarray: The data point at the given index.
	"""
	if (self.stream):
		if (index < self.i):
			self.data = self.stream_data_json(self.input_file)
			next(self.data)
			self.i = 0

		while (self.i <= index):
			value = next(self.data)
			self.i += 1

		return value
	else:
		return self.data[index]

__init__(arg=None, stream=False)

Initializes the Data object from a list, file path, or raw data.

Parameters:

Name Type Description Default
arg Union[list, str, Any]

The input data or path to data file.

None
stream bool

Whether to use streaming mode for large files.

False
Source code in code\create_data.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def __init__(self, arg=None, stream=False):
	"""
	Initializes the Data object from a list, file path, or raw data.

	Args:
		arg (Union[list, str, Any]): The input data or path to data file.
		stream (bool): Whether to use streaming mode for large files.
	"""
	self.stream = stream

	if isinstance(arg, list):
		self.data = np.array(arg)
		self.length = len(self.data)
	elif isinstance(arg, str):
		if (stream):
			self.data = self.stream_data_json(arg)
			self.length = next(self.data)
			self.i = 0
		else:
			self.load_data(arg)
			self.length = len(self.data)

		self.input_file = arg
	else:
		self.data = arg
		self.length = len(self.data)

__iter__()

Returns an iterator over the dataset for use in for-loops.

Returns:

Name Type Description
Iterator

An iterator over the dataset.

Source code in code\create_data.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def __iter__(self):
	"""
	Returns an iterator over the dataset for use in for-loops.

	Returns:
		Iterator: An iterator over the dataset.
	"""
	if (hasattr(self, 'input_file')):
		self.streaming_data = self.stream_data_json(self.input_file)
		next(self.streaming_data)
	else:
		self.streaming_data = 0

	return self

__len__()

Returns the length of the dataset.

Returns:

Name Type Description
int

The number of data points.

Source code in code\create_data.py
210
211
212
213
214
215
216
217
def __len__(self):
	"""
	Returns the length of the dataset.

	Returns:
		int: The number of data points.
	"""
	return self.length

__next__()

Retrieves the next data point in an iteration.

Returns:

Type Description

np.ndarray: The next data point.

Raises:

Type Description
StopIteration

If the end of the dataset is reached.

Source code in code\create_data.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def __next__(self):
	"""
	Retrieves the next data point in an iteration.

	Returns:
		np.ndarray: The next data point.

	Raises:
		StopIteration: If the end of the dataset is reached.
	"""
	try:
		if (hasattr(self, 'input_file')):
			return np.array(next(self.streaming_data))
		else:
			if (self.streaming_data == self.length):
				raise
			else:
				return np.array(self.data[self.streaming_data])

			self.streaming_data += 1
	except StopIteration:
		raise

__setitem__(index, value)

Sets a value in the dataset at a specified index.

Parameters:

Name Type Description Default
index int

The index to modify.

required
value Any

The new value to set.

required
Source code in code\create_data.py
243
244
245
246
247
248
249
250
251
def __setitem__(self, index, value):
	"""
	Sets a value in the dataset at a specified index.

	Args:
		index (int): The index to modify.
		value (Any): The new value to set.
	"""
	self.data[index] = value

data_function(file, save_or_load)

Routes file operation to appropriate function based on file extension.

Parameters:

Name Type Description Default
file str

File path.

required
save_or_load int

1 for save, 2 for load.

required

Returns:

Type Description

Optional[Any]: The result of the load operation if applicable.

Source code in code\create_data.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def data_function(self, file, save_or_load):
	"""
	Routes file operation to appropriate function based on file extension.

	Args:
		file (str): File path.
		save_or_load (int): 1 for save, 2 for load.

	Returns:
		Optional[Any]: The result of the load operation if applicable.
	"""
	if (file == None):
		return

	for ffp in self.file_function_pairs:
		if file[-len(ffp[0]):] == ffp[0]:
			if save_or_load == 1:
				ffp[save_or_load](self.data, file)
			else:
				return ffp[save_or_load](self, file)

getNumpy()

Ensures that the dataset is returned as a NumPy array.

Returns:

Type Description

np.ndarray: Dataset as a NumPy array.

Source code in code\create_data.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def getNumpy(self):
	"""
	Ensures that the dataset is returned as a NumPy array.

	Returns:
		np.ndarray: Dataset as a NumPy array.
	"""
	if isinstance(self.data, np.ndarray):
		# print(self.data.shape)
		return self.data
	else:
		temp = []
		for x in self.data:
			temp.append(np.array(x))

		# print(np.array(temp).shape)
		return np.array(temp)

getSubSet(indexList)

Returns a subset of the data given a list of indices.

Parameters:

Name Type Description Default
indexList list[int]

List of indices to extract.

required

Returns:

Name Type Description
Data

A new Data object containing the selected subset.

Source code in code\create_data.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def getSubSet(self, indexList):
	"""
	Returns a subset of the data given a list of indices.

	Args:
		indexList (list[int]): List of indices to extract.

	Returns:
		Data: A new Data object containing the selected subset.
	"""
	subset = []
	for index in indexList:
		subset.append(self.data[index])
	return Data(subset)

get_random_point()

Returns a randomly selected point from the dataset.

Returns:

Type Description

np.ndarray: A random data point.

Source code in code\create_data.py
455
456
457
458
459
460
461
462
def get_random_point(self):
	"""
	Returns a randomly selected point from the dataset.

	Returns:
		np.ndarray: A random data point.
	"""
	return select_random(self.data)

load_data(input_file)

Loads the dataset from a file, choosing format by extension.

Parameters:

Name Type Description Default
input_file str

Path to the input file.

required

Returns:

Name Type Description
Data

Self (for chaining).

Source code in code\create_data.py
442
443
444
445
446
447
448
449
450
451
452
453
def load_data(self, input_file):
	"""
	Loads the dataset from a file, choosing format by extension.

	Args:
		input_file (str): Path to the input file.

	Returns:
		Data: Self (for chaining).
	"""
	self.data_function(input_file, 2)
	return self

load_data_json(input_file)

Loads the dataset from a JSON file.

Parameters:

Name Type Description Default
input_file str

Path to the input file.

required

Returns:

Type Description

list[np.ndarray]: The loaded data.

Source code in code\create_data.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def load_data_json(self, input_file):
	"""
	Loads the dataset from a JSON file.

	Args:
		input_file (str): Path to the input file.

	Returns:
		list[np.ndarray]: The loaded data.
	"""
	with open(input_file, 'r') as f:
		self.input_file = input_file

		data = json.load(f)
		self.data = data["data"]
		self.length = data["length"]
		for i, point in enumerate(self.data):
			self.data[i] = np.array(point)

		return self.data

load_data_pickle(input_file)

Loads the dataset from a pickle file.

Parameters:

Name Type Description Default
input_file str

Path to the input file.

required

Returns:

Name Type Description
Any

The loaded data.

Source code in code\create_data.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def load_data_pickle(self, input_file):
	"""
	Loads the dataset from a pickle file.

	Args:
		input_file (str): Path to the input file.

	Returns:
		Any: The loaded data.
	"""
	with open(input_file, 'r') as f:
		self.input_file = input_file
		self.data = pickle.load(f)

		return self.data

plot(name=None)

Plots the dataset using matplotlib.

Parameters:

Name Type Description Default
name Optional[str]

File path to save the plot, if specified.

None
Source code in code\create_data.py
464
465
466
467
468
469
470
471
def plot(self, name=None):
	"""
	Plots the dataset using matplotlib.

	Args:
		name (Optional[str]): File path to save the plot, if specified.
	"""
	Plotter().plotPoints(self.data, name)

save_data(output_file)

Saves the dataset to a file, choosing format by extension.

Parameters:

Name Type Description Default
output_file str

Path to the output file.

required

Returns:

Name Type Description
Data

Self (for chaining).

Source code in code\create_data.py
429
430
431
432
433
434
435
436
437
438
439
440
def save_data(self, output_file):
	"""
	Saves the dataset to a file, choosing format by extension.

	Args:
		output_file (str): Path to the output file.

	Returns:
		Data: Self (for chaining).
	"""
	self.data_function(output_file, 1)
	return self

save_data_json(output_file)

Saves the dataset to a JSON file.

Parameters:

Name Type Description Default
output_file str

Path to the output file.

required
Source code in code\create_data.py
306
307
308
309
310
311
312
313
314
315
def save_data_json(self, output_file):
	"""
	Saves the dataset to a JSON file.

	Args:
		output_file (str): Path to the output file.
	"""
	fg = FileGenerator()
	fg.setGenerator(fg.linear_generator)
	fg.stream_save(output_file, self.data)

save_data_pickle(output_file)

Saves the dataset to a pickle file.

Parameters:

Name Type Description Default
output_file str

Path to the output file.

required
Source code in code\create_data.py
317
318
319
320
321
322
323
324
325
def save_data_pickle(self, output_file):
	"""
	Saves the dataset to a pickle file.

	Args:
		output_file (str): Path to the output file.
	"""
	with open(output_file, 'wb') as f: 
		pickle.dump(self.data, f) 

stream_data_json(input_file)

Streams data from a JSON file one entry at a time.

Parameters:

Name Type Description Default
input_file str

Path to the input JSON file.

required

Yields:

Type Description

np.ndarray: A single data point from the dataset.

Source code in code\create_data.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def stream_data_json(self, input_file):
	"""
	Streams data from a JSON file one entry at a time.

	Args:
		input_file (str): Path to the input JSON file.

	Yields:
		np.ndarray: A single data point from the dataset.
	"""
	with open(input_file, 'rb') as f:
		f.seek(0, 2)
		position = f.tell()

		value = ""
		read = False
		while position > 0:
			position -= 1
			f.seek(position)
			byte = f.read(1)

			if byte == b' ':
				# print(value)
				yield int(value)
				break

			if (read):
				value = byte.decode() + value

			if byte == b'}':
				read = True

	with open(input_file, 'r') as f:
		f.readline()

		for line in f:
			if ("length" in line):
				break

			data = json.loads(line.strip().split(']')[0] + ']')
			yield np.array(data)

DataCreator

A utility class to create various synthetic datasets for testing and analysis. Interfaces with FileGenerator to optionally stream data to file.

Attributes:

Name Type Description
fg FileGenerator

An instance of FileGenerator used for generating data points.

Source code in code\create_data.py
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
class DataCreator:
	"""
	A utility class to create various synthetic datasets for testing and analysis.
	Interfaces with FileGenerator to optionally stream data to file.

	Attributes:
		fg (FileGenerator): An instance of FileGenerator used for generating data points.
	"""

	def __init__(self):
		self.fg = FileGenerator()

	def stream_dataset_creator(self, output_file: str, function: callable, seed: int, stream: bool, *args) -> 'Data':
		"""
		Creates a dataset using the specified generator function, supporting streamed or non-streamed output.

		Args:
			output_file (str): File path to save the dataset.
			function (callable): Generator function to create data points.
			seed (int): Random seed for reproducibility.
			stream (bool): If True, streams data directly to the file.
			*args: Additional arguments passed to the generator function.

		Returns:
			Data: The created dataset, either streamed or in-memory.
		"""
		random.seed(seed)

		if stream:
			self.fg.setGenerator(function)
			self.fg.stream_save(output_file, *args)
			data = Data(output_file, stream=True)
		else:
			data = [point for point in function(*args)]
			data = Data(data)
			data.save_data(output_file)

		return data

	def create_dataset_line(self, output_file: str = None, start: float = 0, end: float = 1, points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
		"""
		Creates a 1D line dataset.

		Args:
			output_file (str): File path to save the dataset.
			start (float): Starting point of the line.
			end (float): Ending point of the line.
			points (int): Number of data points.
			seed (int): Random seed.
			stream (bool): Whether to stream to file.

		Returns:
			Data: The generated dataset.
		"""
		return self.stream_dataset_creator(output_file, self.fg.line_generator, seed, stream, start, end, points)

	def create_dataset_square_edge(self, output_file: str = None, p1: tuple = (0, 0), p2: tuple = (1, 1), points: int = 1000, seed: int = 42) -> 'Data':
		"""
		Creates a dataset of points along the edges of a square.

		Args:
			output_file (str): File path to save the dataset.
			p1 (tuple): Bottom-left corner.
			p2 (tuple): Top-right corner.
			points (int): Number of data points.
			seed (int): Random seed.

		Returns:
			Data: The generated dataset.
		"""
		data = []
		random.seed(seed)

		x_diff = p2[0] - p1[0]
		y_diff = p2[1] - p1[1]

		for _ in range(points):
			r = random.random() * 4
			side = int(r)
			var = r - side

			x_side = side % 2
			y_side = side >> 1

			x_rev = 1 - x_side
			y_rev = 1 - y_side

			variation = np.array([var * x_side * x_diff, var * x_rev * y_diff])
			offset = np.array([x_rev * y_side * x_diff, x_side * y_rev * y_diff])
			shift = np.array(p1)

			data.append(variation + offset + shift)

		data = Data(data)
		data.save_data(output_file)
		return data

	def create_dataset_square_fill(self, output_file: str = None, p1: tuple = (0, 0), p2: tuple = (1, 1), points: int = 1000, seed: int = 42) -> 'Data':
		"""
		Creates a dataset of points filling a square area.

		Args:
			output_file (str): File path to save the dataset.
			p1 (tuple): Bottom-left corner.
			p2 (tuple): Top-right corner.
			points (int): Number of data points.
			seed (int): Random seed.

		Returns:
			Data: The generated dataset.
		"""
		data = []
		random.seed(seed)

		x_diff = p2[0] - p1[0]
		y_diff = p2[1] - p1[1]

		for _ in range(points):
			x_rand = random.random()
			y_rand = random.random()
			data.append(np.array([x_diff * x_rand + p1[0], y_diff * y_rand + p1[1]]))

		data = Data(data)
		data.save_data(output_file)
		return data

	def create_dataset_eigth_sphere(self, output_file: str = None, radius: float = 1, x_pos: bool = True, y_pos: bool = True, z_pos: bool = True, points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
		"""
		Creates a dataset on an eighth of a sphere.

		Args:
			output_file (str): File path to save the dataset.
			radius (float): Radius of the sphere.
			x_pos (bool): Use positive x.
			y_pos (bool): Use positive y.
			z_pos (bool): Use positive z.
			points (int): Number of data points.
			seed (int): Random seed.
			stream (bool): Whether to stream to file.

		Returns:
			Data: The generated dataset.
		"""
		return self.stream_dataset_creator(output_file, self.fg.eigth_sphere_generator, seed, stream, radius, x_pos, y_pos, z_pos, points)

	def create_dataset_triangle(self, output_file: str = None, edges: list = [[0, 0], [1, 1], [2, 0]], points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
		"""
		Creates a dataset of points on a triangle.

		Args:
			output_file (str): File path to save the dataset.
			edges (list): Three vertices of the triangle.
			points (int): Number of data points.
			seed (int): Random seed.
			stream (bool): Whether to stream to file.

		Returns:
			Data: The generated dataset.
		"""
		return self.stream_dataset_creator(output_file, self.fg.triangle_generator, seed, stream, edges, points)

	def create_dataset_strong_clusters(self, output_file: str = None, internal_std: float = 1, external_std: float = 10, mean: list = [0, 0], clusters: int = 10, points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
		"""
		Creates a clustered dataset with multiple clusters.

		Args:
			output_file (str): File path to save the dataset.
			internal_std (float): Standard deviation inside a cluster.
			external_std (float): Spread of cluster centers.
			mean (list): Mean location for generating cluster centers.
			clusters (int): Number of clusters.
			points (int): Number of data points.
			seed (int): Random seed.
			stream (bool): Whether to stream to file.

		Returns:
			Data: The generated dataset.
		"""
		data = []
		random.seed(seed)
		np_mean = np.array(mean)

		cluster_centers = [varied_point(np_mean, external_std) for _ in range(clusters)]

		if stream:
			self.fg.setGenerator(self.fg.strong_cluster_generator)
			self.fg.stream_save(output_file, internal_std, cluster_centers, points)
			data = Data(output_file, stream=True)
		else:
			for p in self.fg.strong_cluster_generator(internal_std, cluster_centers, points):
				data.append(p)
			data = Data(data)
			data.save_data(output_file)

		return data

	def rotate_into_dimention(self, data: 'Data', higher_dim: int = 3, seed: int = 42) -> 'Data':
		"""
		Rotates dataset into a higher dimensional space using random rotations.

		Args:
			data (Data): The dataset to rotate.
			higher_dim (int): Dimension to rotate into.
			seed (int): Random seed.

		Returns:
			Data: The rotated dataset.
		"""
		rotation_matrix = np.identity(higher_dim)
		if seed != -1:
			random.seed(seed)

		for x1 in range(higher_dim - 1):
			for x2 in range(x1 + 1, higher_dim):
				angle = 2 * np.pi * random.random()
				rot = np.identity(higher_dim)
				rot[x1, x1] = np.cos(angle)
				rot[x2, x2] = np.cos(angle)
				rot[x1, x2] = np.sin(angle)
				rot[x2, x1] = -np.sin(angle)
				rotation_matrix = np.matmul(rotation_matrix, rot)

		data.data = list(data.data)
		for i in range(len(data)):
			extended = np.zeros(higher_dim)
			extended[:len(data[i])] = data[i]
			data[i] = np.matmul(rotation_matrix, extended)

		data.data = np.array(data.data)
		return data

	def create_dataset_spiral(self, output_file: str = None, radius: float = 1, center: list = [0, 0], rotations: int = 3, height: float = 10, points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
		"""
		Creates a 3D spiral dataset.

		Args:
			output_file (str): File path to save the dataset.
			radius (float): Radius of the spiral.
			center (list): Center offset.
			rotations (int): Number of rotations.
			height (float): Height of the spiral.
			points (int): Number of data points.
			seed (int): Random seed.
			stream (bool): Whether to stream to file.

		Returns:
			Data: The generated dataset.
		"""
		return self.stream_dataset_creator(output_file, self.fg.spiral_generator, seed, stream, radius, center, rotations, height, points)

create_dataset_eigth_sphere(output_file=None, radius=1, x_pos=True, y_pos=True, z_pos=True, points=1000, seed=42, stream=False)

Creates a dataset on an eighth of a sphere.

Parameters:

Name Type Description Default
output_file str

File path to save the dataset.

None
radius float

Radius of the sphere.

1
x_pos bool

Use positive x.

True
y_pos bool

Use positive y.

True
z_pos bool

Use positive z.

True
points int

Number of data points.

1000
seed int

Random seed.

42
stream bool

Whether to stream to file.

False

Returns:

Name Type Description
Data Data

The generated dataset.

Source code in code\create_data.py
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
def create_dataset_eigth_sphere(self, output_file: str = None, radius: float = 1, x_pos: bool = True, y_pos: bool = True, z_pos: bool = True, points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
	"""
	Creates a dataset on an eighth of a sphere.

	Args:
		output_file (str): File path to save the dataset.
		radius (float): Radius of the sphere.
		x_pos (bool): Use positive x.
		y_pos (bool): Use positive y.
		z_pos (bool): Use positive z.
		points (int): Number of data points.
		seed (int): Random seed.
		stream (bool): Whether to stream to file.

	Returns:
		Data: The generated dataset.
	"""
	return self.stream_dataset_creator(output_file, self.fg.eigth_sphere_generator, seed, stream, radius, x_pos, y_pos, z_pos, points)

create_dataset_line(output_file=None, start=0, end=1, points=1000, seed=42, stream=False)

Creates a 1D line dataset.

Parameters:

Name Type Description Default
output_file str

File path to save the dataset.

None
start float

Starting point of the line.

0
end float

Ending point of the line.

1
points int

Number of data points.

1000
seed int

Random seed.

42
stream bool

Whether to stream to file.

False

Returns:

Name Type Description
Data Data

The generated dataset.

Source code in code\create_data.py
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
def create_dataset_line(self, output_file: str = None, start: float = 0, end: float = 1, points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
	"""
	Creates a 1D line dataset.

	Args:
		output_file (str): File path to save the dataset.
		start (float): Starting point of the line.
		end (float): Ending point of the line.
		points (int): Number of data points.
		seed (int): Random seed.
		stream (bool): Whether to stream to file.

	Returns:
		Data: The generated dataset.
	"""
	return self.stream_dataset_creator(output_file, self.fg.line_generator, seed, stream, start, end, points)

create_dataset_spiral(output_file=None, radius=1, center=[0, 0], rotations=3, height=10, points=1000, seed=42, stream=False)

Creates a 3D spiral dataset.

Parameters:

Name Type Description Default
output_file str

File path to save the dataset.

None
radius float

Radius of the spiral.

1
center list

Center offset.

[0, 0]
rotations int

Number of rotations.

3
height float

Height of the spiral.

10
points int

Number of data points.

1000
seed int

Random seed.

42
stream bool

Whether to stream to file.

False

Returns:

Name Type Description
Data Data

The generated dataset.

Source code in code\create_data.py
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
def create_dataset_spiral(self, output_file: str = None, radius: float = 1, center: list = [0, 0], rotations: int = 3, height: float = 10, points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
	"""
	Creates a 3D spiral dataset.

	Args:
		output_file (str): File path to save the dataset.
		radius (float): Radius of the spiral.
		center (list): Center offset.
		rotations (int): Number of rotations.
		height (float): Height of the spiral.
		points (int): Number of data points.
		seed (int): Random seed.
		stream (bool): Whether to stream to file.

	Returns:
		Data: The generated dataset.
	"""
	return self.stream_dataset_creator(output_file, self.fg.spiral_generator, seed, stream, radius, center, rotations, height, points)

create_dataset_square_edge(output_file=None, p1=(0, 0), p2=(1, 1), points=1000, seed=42)

Creates a dataset of points along the edges of a square.

Parameters:

Name Type Description Default
output_file str

File path to save the dataset.

None
p1 tuple

Bottom-left corner.

(0, 0)
p2 tuple

Top-right corner.

(1, 1)
points int

Number of data points.

1000
seed int

Random seed.

42

Returns:

Name Type Description
Data Data

The generated dataset.

Source code in code\create_data.py
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
def create_dataset_square_edge(self, output_file: str = None, p1: tuple = (0, 0), p2: tuple = (1, 1), points: int = 1000, seed: int = 42) -> 'Data':
	"""
	Creates a dataset of points along the edges of a square.

	Args:
		output_file (str): File path to save the dataset.
		p1 (tuple): Bottom-left corner.
		p2 (tuple): Top-right corner.
		points (int): Number of data points.
		seed (int): Random seed.

	Returns:
		Data: The generated dataset.
	"""
	data = []
	random.seed(seed)

	x_diff = p2[0] - p1[0]
	y_diff = p2[1] - p1[1]

	for _ in range(points):
		r = random.random() * 4
		side = int(r)
		var = r - side

		x_side = side % 2
		y_side = side >> 1

		x_rev = 1 - x_side
		y_rev = 1 - y_side

		variation = np.array([var * x_side * x_diff, var * x_rev * y_diff])
		offset = np.array([x_rev * y_side * x_diff, x_side * y_rev * y_diff])
		shift = np.array(p1)

		data.append(variation + offset + shift)

	data = Data(data)
	data.save_data(output_file)
	return data

create_dataset_square_fill(output_file=None, p1=(0, 0), p2=(1, 1), points=1000, seed=42)

Creates a dataset of points filling a square area.

Parameters:

Name Type Description Default
output_file str

File path to save the dataset.

None
p1 tuple

Bottom-left corner.

(0, 0)
p2 tuple

Top-right corner.

(1, 1)
points int

Number of data points.

1000
seed int

Random seed.

42

Returns:

Name Type Description
Data Data

The generated dataset.

Source code in code\create_data.py
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
def create_dataset_square_fill(self, output_file: str = None, p1: tuple = (0, 0), p2: tuple = (1, 1), points: int = 1000, seed: int = 42) -> 'Data':
	"""
	Creates a dataset of points filling a square area.

	Args:
		output_file (str): File path to save the dataset.
		p1 (tuple): Bottom-left corner.
		p2 (tuple): Top-right corner.
		points (int): Number of data points.
		seed (int): Random seed.

	Returns:
		Data: The generated dataset.
	"""
	data = []
	random.seed(seed)

	x_diff = p2[0] - p1[0]
	y_diff = p2[1] - p1[1]

	for _ in range(points):
		x_rand = random.random()
		y_rand = random.random()
		data.append(np.array([x_diff * x_rand + p1[0], y_diff * y_rand + p1[1]]))

	data = Data(data)
	data.save_data(output_file)
	return data

create_dataset_strong_clusters(output_file=None, internal_std=1, external_std=10, mean=[0, 0], clusters=10, points=1000, seed=42, stream=False)

Creates a clustered dataset with multiple clusters.

Parameters:

Name Type Description Default
output_file str

File path to save the dataset.

None
internal_std float

Standard deviation inside a cluster.

1
external_std float

Spread of cluster centers.

10
mean list

Mean location for generating cluster centers.

[0, 0]
clusters int

Number of clusters.

10
points int

Number of data points.

1000
seed int

Random seed.

42
stream bool

Whether to stream to file.

False

Returns:

Name Type Description
Data Data

The generated dataset.

Source code in code\create_data.py
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
def create_dataset_strong_clusters(self, output_file: str = None, internal_std: float = 1, external_std: float = 10, mean: list = [0, 0], clusters: int = 10, points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
	"""
	Creates a clustered dataset with multiple clusters.

	Args:
		output_file (str): File path to save the dataset.
		internal_std (float): Standard deviation inside a cluster.
		external_std (float): Spread of cluster centers.
		mean (list): Mean location for generating cluster centers.
		clusters (int): Number of clusters.
		points (int): Number of data points.
		seed (int): Random seed.
		stream (bool): Whether to stream to file.

	Returns:
		Data: The generated dataset.
	"""
	data = []
	random.seed(seed)
	np_mean = np.array(mean)

	cluster_centers = [varied_point(np_mean, external_std) for _ in range(clusters)]

	if stream:
		self.fg.setGenerator(self.fg.strong_cluster_generator)
		self.fg.stream_save(output_file, internal_std, cluster_centers, points)
		data = Data(output_file, stream=True)
	else:
		for p in self.fg.strong_cluster_generator(internal_std, cluster_centers, points):
			data.append(p)
		data = Data(data)
		data.save_data(output_file)

	return data

create_dataset_triangle(output_file=None, edges=[[0, 0], [1, 1], [2, 0]], points=1000, seed=42, stream=False)

Creates a dataset of points on a triangle.

Parameters:

Name Type Description Default
output_file str

File path to save the dataset.

None
edges list

Three vertices of the triangle.

[[0, 0], [1, 1], [2, 0]]
points int

Number of data points.

1000
seed int

Random seed.

42
stream bool

Whether to stream to file.

False

Returns:

Name Type Description
Data Data

The generated dataset.

Source code in code\create_data.py
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
def create_dataset_triangle(self, output_file: str = None, edges: list = [[0, 0], [1, 1], [2, 0]], points: int = 1000, seed: int = 42, stream: bool = False) -> 'Data':
	"""
	Creates a dataset of points on a triangle.

	Args:
		output_file (str): File path to save the dataset.
		edges (list): Three vertices of the triangle.
		points (int): Number of data points.
		seed (int): Random seed.
		stream (bool): Whether to stream to file.

	Returns:
		Data: The generated dataset.
	"""
	return self.stream_dataset_creator(output_file, self.fg.triangle_generator, seed, stream, edges, points)

rotate_into_dimention(data, higher_dim=3, seed=42)

Rotates dataset into a higher dimensional space using random rotations.

Parameters:

Name Type Description Default
data Data

The dataset to rotate.

required
higher_dim int

Dimension to rotate into.

3
seed int

Random seed.

42

Returns:

Name Type Description
Data Data

The rotated dataset.

Source code in code\create_data.py
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
def rotate_into_dimention(self, data: 'Data', higher_dim: int = 3, seed: int = 42) -> 'Data':
	"""
	Rotates dataset into a higher dimensional space using random rotations.

	Args:
		data (Data): The dataset to rotate.
		higher_dim (int): Dimension to rotate into.
		seed (int): Random seed.

	Returns:
		Data: The rotated dataset.
	"""
	rotation_matrix = np.identity(higher_dim)
	if seed != -1:
		random.seed(seed)

	for x1 in range(higher_dim - 1):
		for x2 in range(x1 + 1, higher_dim):
			angle = 2 * np.pi * random.random()
			rot = np.identity(higher_dim)
			rot[x1, x1] = np.cos(angle)
			rot[x2, x2] = np.cos(angle)
			rot[x1, x2] = np.sin(angle)
			rot[x2, x1] = -np.sin(angle)
			rotation_matrix = np.matmul(rotation_matrix, rot)

	data.data = list(data.data)
	for i in range(len(data)):
		extended = np.zeros(higher_dim)
		extended[:len(data[i])] = data[i]
		data[i] = np.matmul(rotation_matrix, extended)

	data.data = np.array(data.data)
	return data

stream_dataset_creator(output_file, function, seed, stream, *args)

Creates a dataset using the specified generator function, supporting streamed or non-streamed output.

Parameters:

Name Type Description Default
output_file str

File path to save the dataset.

required
function callable

Generator function to create data points.

required
seed int

Random seed for reproducibility.

required
stream bool

If True, streams data directly to the file.

required
*args

Additional arguments passed to the generator function.

()

Returns:

Name Type Description
Data Data

The created dataset, either streamed or in-memory.

Source code in code\create_data.py
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
def stream_dataset_creator(self, output_file: str, function: callable, seed: int, stream: bool, *args) -> 'Data':
	"""
	Creates a dataset using the specified generator function, supporting streamed or non-streamed output.

	Args:
		output_file (str): File path to save the dataset.
		function (callable): Generator function to create data points.
		seed (int): Random seed for reproducibility.
		stream (bool): If True, streams data directly to the file.
		*args: Additional arguments passed to the generator function.

	Returns:
		Data: The created dataset, either streamed or in-memory.
	"""
	random.seed(seed)

	if stream:
		self.fg.setGenerator(function)
		self.fg.stream_save(output_file, *args)
		data = Data(output_file, stream=True)
	else:
		data = [point for point in function(*args)]
		data = Data(data)
		data.save_data(output_file)

	return data

FileGenerator

Generates files for saved data.

This class is designed to assist in saving generated datasets in a streaming fashion. It provides several built-in generators to create synthetic datasets for use with Data and DataCreator classes.

Source code in code\create_data.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
class FileGenerator:
	"""
	Generates files for saved data.

	This class is designed to assist in saving generated datasets in a streaming
	fashion. It provides several built-in generators to create synthetic datasets
	for use with `Data` and `DataCreator` classes.
	"""

	def __init__(self):
		"""Initializes the FileGenerator."""
		pass

	def setGenerator(self, fn):
		"""
		Sets the generator function to be used when saving data.

		Args:
			fn (Callable): A generator function that yields data points.
		"""
		self.data_generator = fn

	def stream_save(self, output_file: str, *args):
		"""
		Saves data to a JSON file in a streaming manner.

		Args:
			output_file (str): Path to the file where data will be saved.
			*args: Arguments to pass to the generator function.

		Returns:
			None
		"""
		with open(output_file, "w") as f:
			f.write("{\"data\": [\n")
			first = True
			length = 0
			for array in self.data_generator(*args):
				if not first:
					f.write(", \n")
				json.dump(list(array), f)
				length += 1
				first = False
			f.write("], \n\"length\": " + str(length) + "}")

	def linear_generator(self, data: np.ndarray):
		"""
		Yields data points one by one from a NumPy array.

		Args:
			data (np.ndarray): Input data.

		Yields:
			np.ndarray: Single data points from the array.
		"""
		for d in data.tolist():
			yield d

	def line_generator(self, start: float, end: float, points: int):
		"""
		Generates points along a line in 1D space.

		Args:
			start (float): Starting point of the line.
			end (float): Ending point of the line.
			points (int): Number of points to generate.

		Yields:
			np.ndarray: Single-point arrays sampled along the line.
		"""
		for _ in range(points):
			yield np.array([random.random() * (end - start) + start])

	def eigth_sphere_generator(self, radius: float, x_pos: int, y_pos: int, z_pos: int, points: int):
		"""
		Generates points on an eighth of a sphere surface.

		Args:
			radius (float): Radius of the sphere.
			x_pos (int): Hemisphere direction for X (0 or 1).
			y_pos (int): Hemisphere direction for Y (0 or 1).
			z_pos (int): Hemisphere direction for Z (0 or 1).
			points (int): Number of points to generate.

		Yields:
			np.ndarray: Points on the eighth sphere surface.
		"""
		for _ in range(points):
			z = random.random()
			angleXY = np.pi * random.random() / 2
			yield np.array([
				radius * np.sqrt(1 - z**2) * np.cos(angleXY) * (2 * x_pos - 1),
				radius * np.sqrt(1 - z**2) * np.sin(angleXY) * (2 * y_pos - 1),
				radius * z * (2 * z_pos - 1)
			])

	def triangle_generator(self, edges: list, points: int):
		"""
		Generates points uniformly within a triangle defined by three vertices.

		Args:
			edges (list): A list of three points (each a list or np.ndarray) defining the triangle.
			points (int): Number of points to generate.

		Yields:
			np.ndarray: Points uniformly sampled inside the triangle.
		"""
		base = np.array(edges[0])
		edgeDiff1 = np.array(edges[1]) - base
		edgeDiff2 = np.array(edges[2]) - base
		for _ in range(points):
			d1 = random.random()
			d2 = random.random()
			if d1 + d2 > 1:
				d1 = 1 - d1
				d2 = 1 - d2
			yield base + d1 * edgeDiff1 + d2 * edgeDiff2

	def strong_cluster_generator(self, internal_std: float, cluster_centers: list, points: int):
		"""
		Generates clustered points around multiple centers with specified standard deviation.

		Args:
			internal_std (float): Standard deviation within each cluster.
			cluster_centers (list): A list of cluster center points.
			points (int): Number of points to generate.

		Yields:
			np.ndarray: Points sampled from the clusters.
		"""
		c = -1
		for p in range(points):
			if (p / points >= c / 100):
				c += 1
			yield varied_point(select_random(cluster_centers), internal_std)

	def spiral_generator(self, radius: float, center: list, rotations: int, height: float, points: int):
		"""
		Generates points forming a 3D spiral (helix).

		Args:
			radius (float): Radius of the spiral.
			center (list): Center offset of the spiral (not used directly in current implementation).
			rotations (int): Number of full 360° turns.
			height (float): Total height of the spiral.
			points (int): Number of points to generate.

		Yields:
			np.ndarray: Points along the spiral.
		"""
		line = 2 * np.pi * rotations
		heightPerRadian = height / line
		for _ in range(points):
			d = random.random() * line
			yield np.array([
				radius * np.cos(d),
				radius * np.sin(d),
				heightPerRadian * d
			])

__init__()

Initializes the FileGenerator.

Source code in code\create_data.py
500
501
502
def __init__(self):
	"""Initializes the FileGenerator."""
	pass

eigth_sphere_generator(radius, x_pos, y_pos, z_pos, points)

Generates points on an eighth of a sphere surface.

Parameters:

Name Type Description Default
radius float

Radius of the sphere.

required
x_pos int

Hemisphere direction for X (0 or 1).

required
y_pos int

Hemisphere direction for Y (0 or 1).

required
z_pos int

Hemisphere direction for Z (0 or 1).

required
points int

Number of points to generate.

required

Yields:

Type Description

np.ndarray: Points on the eighth sphere surface.

Source code in code\create_data.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
def eigth_sphere_generator(self, radius: float, x_pos: int, y_pos: int, z_pos: int, points: int):
	"""
	Generates points on an eighth of a sphere surface.

	Args:
		radius (float): Radius of the sphere.
		x_pos (int): Hemisphere direction for X (0 or 1).
		y_pos (int): Hemisphere direction for Y (0 or 1).
		z_pos (int): Hemisphere direction for Z (0 or 1).
		points (int): Number of points to generate.

	Yields:
		np.ndarray: Points on the eighth sphere surface.
	"""
	for _ in range(points):
		z = random.random()
		angleXY = np.pi * random.random() / 2
		yield np.array([
			radius * np.sqrt(1 - z**2) * np.cos(angleXY) * (2 * x_pos - 1),
			radius * np.sqrt(1 - z**2) * np.sin(angleXY) * (2 * y_pos - 1),
			radius * z * (2 * z_pos - 1)
		])

line_generator(start, end, points)

Generates points along a line in 1D space.

Parameters:

Name Type Description Default
start float

Starting point of the line.

required
end float

Ending point of the line.

required
points int

Number of points to generate.

required

Yields:

Type Description

np.ndarray: Single-point arrays sampled along the line.

Source code in code\create_data.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def line_generator(self, start: float, end: float, points: int):
	"""
	Generates points along a line in 1D space.

	Args:
		start (float): Starting point of the line.
		end (float): Ending point of the line.
		points (int): Number of points to generate.

	Yields:
		np.ndarray: Single-point arrays sampled along the line.
	"""
	for _ in range(points):
		yield np.array([random.random() * (end - start) + start])

linear_generator(data)

Yields data points one by one from a NumPy array.

Parameters:

Name Type Description Default
data ndarray

Input data.

required

Yields:

Type Description

np.ndarray: Single data points from the array.

Source code in code\create_data.py
536
537
538
539
540
541
542
543
544
545
546
547
def linear_generator(self, data: np.ndarray):
	"""
	Yields data points one by one from a NumPy array.

	Args:
		data (np.ndarray): Input data.

	Yields:
		np.ndarray: Single data points from the array.
	"""
	for d in data.tolist():
		yield d

setGenerator(fn)

Sets the generator function to be used when saving data.

Parameters:

Name Type Description Default
fn Callable

A generator function that yields data points.

required
Source code in code\create_data.py
504
505
506
507
508
509
510
511
def setGenerator(self, fn):
	"""
	Sets the generator function to be used when saving data.

	Args:
		fn (Callable): A generator function that yields data points.
	"""
	self.data_generator = fn

spiral_generator(radius, center, rotations, height, points)

Generates points forming a 3D spiral (helix).

Parameters:

Name Type Description Default
radius float

Radius of the spiral.

required
center list

Center offset of the spiral (not used directly in current implementation).

required
rotations int

Number of full 360° turns.

required
height float

Total height of the spiral.

required
points int

Number of points to generate.

required

Yields:

Type Description

np.ndarray: Points along the spiral.

Source code in code\create_data.py
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
def spiral_generator(self, radius: float, center: list, rotations: int, height: float, points: int):
	"""
	Generates points forming a 3D spiral (helix).

	Args:
		radius (float): Radius of the spiral.
		center (list): Center offset of the spiral (not used directly in current implementation).
		rotations (int): Number of full 360° turns.
		height (float): Total height of the spiral.
		points (int): Number of points to generate.

	Yields:
		np.ndarray: Points along the spiral.
	"""
	line = 2 * np.pi * rotations
	heightPerRadian = height / line
	for _ in range(points):
		d = random.random() * line
		yield np.array([
			radius * np.cos(d),
			radius * np.sin(d),
			heightPerRadian * d
		])

stream_save(output_file, *args)

Saves data to a JSON file in a streaming manner.

Parameters:

Name Type Description Default
output_file str

Path to the file where data will be saved.

required
*args

Arguments to pass to the generator function.

()

Returns:

Type Description

None

Source code in code\create_data.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
def stream_save(self, output_file: str, *args):
	"""
	Saves data to a JSON file in a streaming manner.

	Args:
		output_file (str): Path to the file where data will be saved.
		*args: Arguments to pass to the generator function.

	Returns:
		None
	"""
	with open(output_file, "w") as f:
		f.write("{\"data\": [\n")
		first = True
		length = 0
		for array in self.data_generator(*args):
			if not first:
				f.write(", \n")
			json.dump(list(array), f)
			length += 1
			first = False
		f.write("], \n\"length\": " + str(length) + "}")

strong_cluster_generator(internal_std, cluster_centers, points)

Generates clustered points around multiple centers with specified standard deviation.

Parameters:

Name Type Description Default
internal_std float

Standard deviation within each cluster.

required
cluster_centers list

A list of cluster center points.

required
points int

Number of points to generate.

required

Yields:

Type Description

np.ndarray: Points sampled from the clusters.

Source code in code\create_data.py
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
def strong_cluster_generator(self, internal_std: float, cluster_centers: list, points: int):
	"""
	Generates clustered points around multiple centers with specified standard deviation.

	Args:
		internal_std (float): Standard deviation within each cluster.
		cluster_centers (list): A list of cluster center points.
		points (int): Number of points to generate.

	Yields:
		np.ndarray: Points sampled from the clusters.
	"""
	c = -1
	for p in range(points):
		if (p / points >= c / 100):
			c += 1
		yield varied_point(select_random(cluster_centers), internal_std)

triangle_generator(edges, points)

Generates points uniformly within a triangle defined by three vertices.

Parameters:

Name Type Description Default
edges list

A list of three points (each a list or np.ndarray) defining the triangle.

required
points int

Number of points to generate.

required

Yields:

Type Description

np.ndarray: Points uniformly sampled inside the triangle.

Source code in code\create_data.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
def triangle_generator(self, edges: list, points: int):
	"""
	Generates points uniformly within a triangle defined by three vertices.

	Args:
		edges (list): A list of three points (each a list or np.ndarray) defining the triangle.
		points (int): Number of points to generate.

	Yields:
		np.ndarray: Points uniformly sampled inside the triangle.
	"""
	base = np.array(edges[0])
	edgeDiff1 = np.array(edges[1]) - base
	edgeDiff2 = np.array(edges[2]) - base
	for _ in range(points):
		d1 = random.random()
		d2 = random.random()
		if d1 + d2 > 1:
			d1 = 1 - d1
			d2 = 1 - d2
		yield base + d1 * edgeDiff1 + d2 * edgeDiff2

Plotter

Graphs the data into different formats.

Source code in code\create_data.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class Plotter:
	"""
	Graphs the data into different formats.
	"""

	def pointFormatting(self, points: list[np.ndarray]) -> tuple[list[float], list[float], Optional[list[float]]]:
		"""
		Formats points into separate coordinate lists for plotting.

		Args:
			points (list[np.ndarray]): A list of points as NumPy arrays.

		Returns:
			tuple: x, y, and optionally z coordinate lists.
		"""
		size = len(points[0])
		x_coords = [point[0] for point in points]
		z_coords = None
		if size > 1:
			y_coords = [point[1] for point in points]
			if size > 2:
				z_coords = [point[2] for point in points]
		else:
			y_coords = [0 for point in points]
		return (x_coords, y_coords, z_coords)

	def plotPoints(self, points: list[np.ndarray], name: Optional[str] = None) -> None:
		"""
		Plots a single set of points in 2D or 3D.

		Args:
			points (list[np.ndarray]): A list of points to plot.
			name (Optional[str]): Optional filename to save the plot.
		"""
		self.plotPointSets([points], name)

	def plotPointSets(self, sets: list[list[np.ndarray]], name: Optional[str] = None) -> None:
		"""
		Plots multiple sets of points in different colors.

		Args:
			sets (list[list[np.ndarray]]): A list of point sets.
			name (Optional[str]): Optional filename to save the plot.
		"""
		markers = ['o', 'v', '*']
		color = ['r', 'g', 'b']
		size = len(sets[0][0])
		fig = plt.figure()
		if size == 3:
			ax = fig.add_subplot(111, projection='3d')
		else:
			ax = fig.add_subplot(111)
		for i, points in enumerate(sets):
			(x_coords, y_coords, z_coords) = self.pointFormatting(points)
			if size == 3:
				ax.scatter(x_coords, y_coords, z_coords, c=color[i], marker=markers[i], label='Points')
			else:
				ax.scatter(x_coords, y_coords, c=color[i], marker=markers[i], label='Points')
		ax.legend()
		if name:
			plt.savefig(name)
		plt.show()

	def voltage_plot(
		self,
		solver,
		color: str = 'r',
		ax = None,
		show: bool = True,
		label: str = "",
		colored: bool = False,
		name: Optional[str] = None
	):
		"""
		Plots voltage data overlaid on input data using optional PCA projection.

		Args:
			solver: A voltage solver instance with `.problem.data` and `.voltages`.
			color (str): Color for the points if `colored` is False.
			ax: Matplotlib axis to plot on (if provided).
			show (bool): Whether to show the plot.
			label (str): Label for the legend.
			colored (bool): Whether to color the points by voltage values.
			name (Optional[str]): Optional filename to save the plot.

		Returns:
			The axis with the plotted data.
		"""
		dim = len(solver.problem.data[0])

		if ax is None:
			fig = plt.figure()
			if (dim + (not colored)) == 3:
				ax = fig.add_subplot(111, projection="3d")
			else:
				ax = fig.add_subplot(111)

		if dim > 3:
			pca = PCA(n_components=2)
			points_2d = pca.fit_transform(solver.problem.data)
			x_coords, y_coords, z_coords = points_2d[:, 0], points_2d[:, 1], None
			dim = 2
		else:
			x_coords, y_coords, z_coords = self.pointFormatting(solver.problem.data)

		cmap = None
		c = color
		args = [x_coords, y_coords, z_coords][:dim]
		args.append(solver.voltages)

		if colored:
			cmap = 'viridis'
			c = solver.voltages
			args = args[:-1]

		ax.scatter(*args, c=c, cmap=cmap, marker='o', label=label)

		if name:
			plt.savefig(name)
		if show:
			plt.show()

		return ax

plotPointSets(sets, name=None)

Plots multiple sets of points in different colors.

Parameters:

Name Type Description Default
sets list[list[ndarray]]

A list of point sets.

required
name Optional[str]

Optional filename to save the plot.

None
Source code in code\create_data.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def plotPointSets(self, sets: list[list[np.ndarray]], name: Optional[str] = None) -> None:
	"""
	Plots multiple sets of points in different colors.

	Args:
		sets (list[list[np.ndarray]]): A list of point sets.
		name (Optional[str]): Optional filename to save the plot.
	"""
	markers = ['o', 'v', '*']
	color = ['r', 'g', 'b']
	size = len(sets[0][0])
	fig = plt.figure()
	if size == 3:
		ax = fig.add_subplot(111, projection='3d')
	else:
		ax = fig.add_subplot(111)
	for i, points in enumerate(sets):
		(x_coords, y_coords, z_coords) = self.pointFormatting(points)
		if size == 3:
			ax.scatter(x_coords, y_coords, z_coords, c=color[i], marker=markers[i], label='Points')
		else:
			ax.scatter(x_coords, y_coords, c=color[i], marker=markers[i], label='Points')
	ax.legend()
	if name:
		plt.savefig(name)
	plt.show()

plotPoints(points, name=None)

Plots a single set of points in 2D or 3D.

Parameters:

Name Type Description Default
points list[ndarray]

A list of points to plot.

required
name Optional[str]

Optional filename to save the plot.

None
Source code in code\create_data.py
83
84
85
86
87
88
89
90
91
def plotPoints(self, points: list[np.ndarray], name: Optional[str] = None) -> None:
	"""
	Plots a single set of points in 2D or 3D.

	Args:
		points (list[np.ndarray]): A list of points to plot.
		name (Optional[str]): Optional filename to save the plot.
	"""
	self.plotPointSets([points], name)

pointFormatting(points)

Formats points into separate coordinate lists for plotting.

Parameters:

Name Type Description Default
points list[ndarray]

A list of points as NumPy arrays.

required

Returns:

Name Type Description
tuple tuple[list[float], list[float], Optional[list[float]]]

x, y, and optionally z coordinate lists.

Source code in code\create_data.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def pointFormatting(self, points: list[np.ndarray]) -> tuple[list[float], list[float], Optional[list[float]]]:
	"""
	Formats points into separate coordinate lists for plotting.

	Args:
		points (list[np.ndarray]): A list of points as NumPy arrays.

	Returns:
		tuple: x, y, and optionally z coordinate lists.
	"""
	size = len(points[0])
	x_coords = [point[0] for point in points]
	z_coords = None
	if size > 1:
		y_coords = [point[1] for point in points]
		if size > 2:
			z_coords = [point[2] for point in points]
	else:
		y_coords = [0 for point in points]
	return (x_coords, y_coords, z_coords)

voltage_plot(solver, color='r', ax=None, show=True, label='', colored=False, name=None)

Plots voltage data overlaid on input data using optional PCA projection.

Parameters:

Name Type Description Default
solver

A voltage solver instance with .problem.data and .voltages.

required
color str

Color for the points if colored is False.

'r'
ax

Matplotlib axis to plot on (if provided).

None
show bool

Whether to show the plot.

True
label str

Label for the legend.

''
colored bool

Whether to color the points by voltage values.

False
name Optional[str]

Optional filename to save the plot.

None

Returns:

Type Description

The axis with the plotted data.

Source code in code\create_data.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def voltage_plot(
	self,
	solver,
	color: str = 'r',
	ax = None,
	show: bool = True,
	label: str = "",
	colored: bool = False,
	name: Optional[str] = None
):
	"""
	Plots voltage data overlaid on input data using optional PCA projection.

	Args:
		solver: A voltage solver instance with `.problem.data` and `.voltages`.
		color (str): Color for the points if `colored` is False.
		ax: Matplotlib axis to plot on (if provided).
		show (bool): Whether to show the plot.
		label (str): Label for the legend.
		colored (bool): Whether to color the points by voltage values.
		name (Optional[str]): Optional filename to save the plot.

	Returns:
		The axis with the plotted data.
	"""
	dim = len(solver.problem.data[0])

	if ax is None:
		fig = plt.figure()
		if (dim + (not colored)) == 3:
			ax = fig.add_subplot(111, projection="3d")
		else:
			ax = fig.add_subplot(111)

	if dim > 3:
		pca = PCA(n_components=2)
		points_2d = pca.fit_transform(solver.problem.data)
		x_coords, y_coords, z_coords = points_2d[:, 0], points_2d[:, 1], None
		dim = 2
	else:
		x_coords, y_coords, z_coords = self.pointFormatting(solver.problem.data)

	cmap = None
	c = color
	args = [x_coords, y_coords, z_coords][:dim]
	args.append(solver.voltages)

	if colored:
		cmap = 'viridis'
		c = solver.voltages
		args = args[:-1]

	ax.scatter(*args, c=c, cmap=cmap, marker='o', label=label)

	if name:
		plt.savefig(name)
	if show:
		plt.show()

	return ax

dimentional_variation(dimentions)

Returns a NumPy array of random values from a standard normal distribution.

Parameters:

Name Type Description Default
dimentions int

Number of dimensions/values to return.

required

Returns:

Type Description
ndarray

np.ndarray: Array of random values sampled from the standard normal distribution.

Source code in code\create_data.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def dimentional_variation(dimentions: int) -> np.ndarray:
	"""
	Returns a NumPy array of random values from a standard normal distribution.

	Args:
		dimentions (int): Number of dimensions/values to return.

	Returns:
		np.ndarray: Array of random values sampled from the standard normal distribution.
	"""
	z_vals = []
	for d in range(dimentions):
		z_vals.append(stats.norm.ppf(random.random()))

	return np.array(z_vals)

select_random(array)

Selects a random element from an array.

Parameters:

Name Type Description Default
array list

The array to select from.

required

Returns:

Name Type Description
Any any

A random element from the array.

Source code in code\create_data.py
13
14
15
16
17
18
19
20
21
22
23
def select_random(array: list) -> any:
	"""
	Selects a random element from an array.

	Args:
		array (list): The array to select from.

	Returns:
		Any: A random element from the array.
	"""
	return array[int(len(array) * random.random())]

varied_point(mean, std)

Returns a point that is randomly offset from the mean based on standard deviation.

Parameters:

Name Type Description Default
mean ndarray

The mean location of the point.

required
std float

Standard deviation to apply.

required

Returns:

Type Description
ndarray

np.ndarray: A randomly varied point.

Source code in code\create_data.py
43
44
45
46
47
48
49
50
51
52
53
54
def varied_point(mean: np.ndarray, std: float) -> np.ndarray:
	"""
	Returns a point that is randomly offset from the mean based on standard deviation.

	Args:
		mean (np.ndarray): The mean location of the point.
		std (float): Standard deviation to apply.

	Returns:
		np.ndarray: A randomly varied point.
	"""
	return mean + std * dimentional_variation(len(mean))

Partitions

Bases: DistanceBased

Using K-means to partition a large dataset

Source code in code\kmeans.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class Partitions(DistanceBased):
	"""Using K-means to partition a large dataset"""
	def __init__(self, data):
		self.data = data
		super().__init__()

	def k_means_plus_plus(self, k):
		"""The old k-means++ algorithm before using sci-kit"""

		# print(self.data.data)
		self.centers = [create_data.select_random(self.data)]

		for i in range(k - 1):
			distances = []

			for point in self.data:
				# print(type(point))
				# print(type(self.centers[0]))

				# print(point)
				# print(self.centers[0])

				d = self.distance(point, self.centers[0])
				for center in self.centers:
					d = min(d, self.distance(point, center))

				distances.append(d)

			distances = np.array(distances)
			distances /= np.sum(distances)

			self.centers.append(weighted_random(self.data, distances))

		return self.centers

	def k_means(self, k, seed=42, savePointAssignments=False):
		"""Runs k-means and saves the centers and point counts. With option to save pointAssignments for voronoi drawing"""
		if (seed == -1):
			kmeans = KMeans(n_clusters=k, init="k-means++").fit(self.data)
		else:
			kmeans = KMeans(n_clusters=k, random_state=int(seed), init="k-means++", n_init=1).fit(self.data)

		self.k = k
		self.centers = kmeans.cluster_centers_
		self.point_counts = np.bincount(kmeans.labels_).tolist()

		if savePointAssignments:
			self.point_assignments = [[] for i in range(k)]
			for i, point in enumerate(data):
				label = kmeans.labels_[i]

				# print(point)
				# print(self.centers[label])
				# print(self.distance(point, self.centers[label]))
				self.point_assignments[label].append([point, self.distance(point, self.centers[label])])

			# self.point_assignments = [data[kmeans.labels_ == i] for i in range(k)]	# k times less efficient
		# self.voronoi = Voronoi(self.centers)

	def my_k_means(self, k, seed=42, savePointAssignments=False):
		"""The old k-means algorithm"""

		if (seed != -1):
			random.seed(seed)

		self.centers = self.k_means_plus_plus(k)

		point_accumulator = [np.zeros(len(self.data[0])) for i in range(k)]
		point_counts = [0 for i in range(k)]

		if (savePointAssignments):														# This removes the benefit of streaming
			self.point_assignments = [[] for i in range(k)]

		for i, point in enumerate(self.data):
			min_index = 0
			min_dist = self.distance(point, self.centers[0])

			for c in range(k - 1):
				dist = self.distance(point, self.centers[c + 1])
				if (min_dist > dist):
					min_index = c + 1
					min_dist = dist

			if (savePointAssignments):
				self.point_assignments[min_index].append([point, min_dist])

			point_accumulator[min_index] += point
			point_counts[min_index] += 1

		updated_centers = []
		self.point_counts = []

		for acc, count in zip(point_accumulator, point_counts):
			if (count != 0):
				updated_centers.append(acc / count)
				self.point_counts.append(count)

		self.centers = updated_centers
		self.voronoi = Voronoi(self.centers)

	def getClosestPoints(self, index):
		"""
		Finds the points whose closest points are the point indicated by the index

		Args:
			index (int): the index of the point

		Returns:
			List[np.ndarray]: All the points whose closest point is data[index]

		"""
		closest = []
		for i, point in enumerate(self.data):
			min_index = 0
			min_dist = self.distance(point, self.centers[0])

			for c in range(len(self.centers) - 1):
				dist = self.distance(point, self.centers[c + 1])
				if (min_dist > dist):
					min_index = c + 1
					min_dist = dist

			if (min_index == index):
				closest.append(i)

		return closest

	def plot(self, color='r', marker='o', ax=None, name=None):
		"""Plot the kmeans"""
		plot = create_data.Plotter()

		size = len(self.centers[0])

		if (ax == None):
			fig = plt.figure()

			if (size == 3):
				ax = fig.add_subplot(111, projection='3d')
			else:
				ax = fig.add_subplot(111)

		if (size == 3):
			(x_coords, y_coords, z_coords) = plot.pointFormatting(self.centers)
			ax.scatter(x_coords, y_coords, z_coords, c=color, marker=marker, label='Centers')
		else:
			(x_coords, y_coords, z_coords) = plot.pointFormatting(self.data)
			ax.scatter(x_coords, y_coords, c=color, marker=marker, label='Points')

			# voronoi_plot_2d(self.voronoi, ax=ax, show_vertices=False, line_colors='blue', line_width=1, line_alpha=0.6)

		ax.legend()

		if (name):
			plt.savefig(name)

		plt.show()

getClosestPoints(index)

Finds the points whose closest points are the point indicated by the index

Parameters:

Name Type Description Default
index int

the index of the point

required

Returns:

Type Description

List[np.ndarray]: All the points whose closest point is data[index]

Source code in code\kmeans.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def getClosestPoints(self, index):
	"""
	Finds the points whose closest points are the point indicated by the index

	Args:
		index (int): the index of the point

	Returns:
		List[np.ndarray]: All the points whose closest point is data[index]

	"""
	closest = []
	for i, point in enumerate(self.data):
		min_index = 0
		min_dist = self.distance(point, self.centers[0])

		for c in range(len(self.centers) - 1):
			dist = self.distance(point, self.centers[c + 1])
			if (min_dist > dist):
				min_index = c + 1
				min_dist = dist

		if (min_index == index):
			closest.append(i)

	return closest

k_means(k, seed=42, savePointAssignments=False)

Runs k-means and saves the centers and point counts. With option to save pointAssignments for voronoi drawing

Source code in code\kmeans.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def k_means(self, k, seed=42, savePointAssignments=False):
	"""Runs k-means and saves the centers and point counts. With option to save pointAssignments for voronoi drawing"""
	if (seed == -1):
		kmeans = KMeans(n_clusters=k, init="k-means++").fit(self.data)
	else:
		kmeans = KMeans(n_clusters=k, random_state=int(seed), init="k-means++", n_init=1).fit(self.data)

	self.k = k
	self.centers = kmeans.cluster_centers_
	self.point_counts = np.bincount(kmeans.labels_).tolist()

	if savePointAssignments:
		self.point_assignments = [[] for i in range(k)]
		for i, point in enumerate(data):
			label = kmeans.labels_[i]

			# print(point)
			# print(self.centers[label])
			# print(self.distance(point, self.centers[label]))
			self.point_assignments[label].append([point, self.distance(point, self.centers[label])])

k_means_plus_plus(k)

The old k-means++ algorithm before using sci-kit

Source code in code\kmeans.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def k_means_plus_plus(self, k):
	"""The old k-means++ algorithm before using sci-kit"""

	# print(self.data.data)
	self.centers = [create_data.select_random(self.data)]

	for i in range(k - 1):
		distances = []

		for point in self.data:
			# print(type(point))
			# print(type(self.centers[0]))

			# print(point)
			# print(self.centers[0])

			d = self.distance(point, self.centers[0])
			for center in self.centers:
				d = min(d, self.distance(point, center))

			distances.append(d)

		distances = np.array(distances)
		distances /= np.sum(distances)

		self.centers.append(weighted_random(self.data, distances))

	return self.centers

my_k_means(k, seed=42, savePointAssignments=False)

The old k-means algorithm

Source code in code\kmeans.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def my_k_means(self, k, seed=42, savePointAssignments=False):
	"""The old k-means algorithm"""

	if (seed != -1):
		random.seed(seed)

	self.centers = self.k_means_plus_plus(k)

	point_accumulator = [np.zeros(len(self.data[0])) for i in range(k)]
	point_counts = [0 for i in range(k)]

	if (savePointAssignments):														# This removes the benefit of streaming
		self.point_assignments = [[] for i in range(k)]

	for i, point in enumerate(self.data):
		min_index = 0
		min_dist = self.distance(point, self.centers[0])

		for c in range(k - 1):
			dist = self.distance(point, self.centers[c + 1])
			if (min_dist > dist):
				min_index = c + 1
				min_dist = dist

		if (savePointAssignments):
			self.point_assignments[min_index].append([point, min_dist])

		point_accumulator[min_index] += point
		point_counts[min_index] += 1

	updated_centers = []
	self.point_counts = []

	for acc, count in zip(point_accumulator, point_counts):
		if (count != 0):
			updated_centers.append(acc / count)
			self.point_counts.append(count)

	self.centers = updated_centers
	self.voronoi = Voronoi(self.centers)

plot(color='r', marker='o', ax=None, name=None)

Plot the kmeans

Source code in code\kmeans.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def plot(self, color='r', marker='o', ax=None, name=None):
	"""Plot the kmeans"""
	plot = create_data.Plotter()

	size = len(self.centers[0])

	if (ax == None):
		fig = plt.figure()

		if (size == 3):
			ax = fig.add_subplot(111, projection='3d')
		else:
			ax = fig.add_subplot(111)

	if (size == 3):
		(x_coords, y_coords, z_coords) = plot.pointFormatting(self.centers)
		ax.scatter(x_coords, y_coords, z_coords, c=color, marker=marker, label='Centers')
	else:
		(x_coords, y_coords, z_coords) = plot.pointFormatting(self.data)
		ax.scatter(x_coords, y_coords, c=color, marker=marker, label='Points')

		# voronoi_plot_2d(self.voronoi, ax=ax, show_vertices=False, line_colors='blue', line_width=1, line_alpha=0.6)

	ax.legend()

	if (name):
		plt.savefig(name)

	plt.show()

Landmark

Represents a location in the dataset where a voltage will be applied.

The index can refer either to an individual datapoint or a partition center.

Source code in code\voltage.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Landmark:
	"""
	Represents a location in the dataset where a voltage will be applied.

	The `index` can refer either to an individual datapoint or a partition center.
	"""

	def __init__(self, index: int, voltage: float) -> None:
		"""
		Initializes a Landmark.

		Args:
			index (int): Index of the datapoint or partition center.
			voltage (float): Voltage to be applied at the specified index.
		"""
		self.index = index
		self.voltage = voltage

	@staticmethod
	def createLandmarkClosestTo(
		data: List[Any],
		point: Any,
		voltage: float,
		distanceFn: Optional[object] = None,
		ignore: List[int] = []
	) -> "Landmark":
		"""
		Creates a Landmark at the index of the datapoint in `data` closest to `point`.

		Args:
			data (List[Any]): The dataset to search over.
			point (Any): The reference point to find the closest datapoint to.
			voltage (float): The voltage to assign to the resulting Landmark.
			distanceFn (Optional[object]): A distance function with a `.distance(a, b)` method.
										   Defaults to `kmeans.DistanceBased()` if None.
			ignore (List[int], optional): List of indices to skip during the search. Defaults to empty list.

		Returns:
			Landmark: A Landmark instance corresponding to the closest datapoint.
		"""
		if distanceFn is None:
			distanceFn = kmeans.DistanceBased()

		most_central_index = 0
		mindist = distanceFn.distance(data[0], point)

		for index in range(1, len(data)):
			if index in ignore:
				continue

			dist = distanceFn.distance(data[index], point)
			if dist < mindist:
				most_central_index = index
				mindist = dist

		return Landmark(most_central_index, voltage)

__init__(index, voltage)

Initializes a Landmark.

Parameters:

Name Type Description Default
index int

Index of the datapoint or partition center.

required
voltage float

Voltage to be applied at the specified index.

required
Source code in code\voltage.py
20
21
22
23
24
25
26
27
28
29
def __init__(self, index: int, voltage: float) -> None:
	"""
	Initializes a Landmark.

	Args:
		index (int): Index of the datapoint or partition center.
		voltage (float): Voltage to be applied at the specified index.
	"""
	self.index = index
	self.voltage = voltage

createLandmarkClosestTo(data, point, voltage, distanceFn=None, ignore=[]) staticmethod

Creates a Landmark at the index of the datapoint in data closest to point.

Parameters:

Name Type Description Default
data List[Any]

The dataset to search over.

required
point Any

The reference point to find the closest datapoint to.

required
voltage float

The voltage to assign to the resulting Landmark.

required
distanceFn Optional[object]

A distance function with a .distance(a, b) method. Defaults to kmeans.DistanceBased() if None.

None
ignore List[int]

List of indices to skip during the search. Defaults to empty list.

[]

Returns:

Name Type Description
Landmark Landmark

A Landmark instance corresponding to the closest datapoint.

Source code in code\voltage.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@staticmethod
def createLandmarkClosestTo(
	data: List[Any],
	point: Any,
	voltage: float,
	distanceFn: Optional[object] = None,
	ignore: List[int] = []
) -> "Landmark":
	"""
	Creates a Landmark at the index of the datapoint in `data` closest to `point`.

	Args:
		data (List[Any]): The dataset to search over.
		point (Any): The reference point to find the closest datapoint to.
		voltage (float): The voltage to assign to the resulting Landmark.
		distanceFn (Optional[object]): A distance function with a `.distance(a, b)` method.
									   Defaults to `kmeans.DistanceBased()` if None.
		ignore (List[int], optional): List of indices to skip during the search. Defaults to empty list.

	Returns:
		Landmark: A Landmark instance corresponding to the closest datapoint.
	"""
	if distanceFn is None:
		distanceFn = kmeans.DistanceBased()

	most_central_index = 0
	mindist = distanceFn.distance(data[0], point)

	for index in range(1, len(data)):
		if index in ignore:
			continue

		dist = distanceFn.distance(data[index], point)
		if dist < mindist:
			most_central_index = index
			mindist = dist

	return Landmark(most_central_index, voltage)

Problem

Bases: DistanceBased

Represents the clustering/graph problem to be solved, extending a distance-based kernel with landmarks and weights.

Source code in code\voltage.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class Problem(kmeans.DistanceBased):
	"""
	Represents the clustering/graph problem to be solved, 
	extending a distance-based kernel with landmarks and weights.
	"""

	def __init__(self, data: Any) -> None:
		"""
		Initializes the Problem instance.

		Args:
			data: An object containing your dataset. Must support len(data) 
				  and data.getNumpy() to return an (n, d) numpy array.
		"""
		super().__init__()
		self.data = data
		self.landmarks = []
		n = len(data)
		self.weights = np.zeros([n, n])
		self.universalGround = False

	def timeStart(self) -> None:
		"""
		Records the current time to measure elapsed intervals.
		"""
		self.start = time.time()

	def timeEnd(self, replace: bool = True) -> float:
		"""
		Computes the elapsed time since the last timeStart().

		Args:
			replace (bool): If True, resets the start time to now.

		Returns:
			float: Seconds elapsed since last start.
		"""
		cur_time = time.time()
		diff = cur_time - self.start
		if replace:
			self.start = cur_time
		return diff

	def setKernel(self, kernel: Callable[..., np.ndarray]) -> None:
		"""
		Sets the kernel function to use for weight computations.

		Args:
			kernel (callable): A function or callable object with signature
							   kernel(X, Y, *params) → ndarray of shape (|X|, |Y|).
		"""
		self.kernel = kernel

	def efficientSquareDistance(self, data: np.ndarray) -> np.ndarray:
		"""
		Computes the pairwise squared Euclidean distances of the rows in `data`.

		Uses the identity ‖x−y‖² = ‖x‖² + ‖y‖² − 2 x·y for efficiency.

		Args:
			data (ndarray): Array of shape (n, d).

		Returns:
			ndarray: Matrix of shape (n, n) where entry (i, j) is squared distance.
		"""
		data_norm2 = np.sum(data**2, axis=1)
		x_norm2 = data_norm2.reshape(-1, 1)
		y_norm2 = data_norm2.reshape(1, -1)
		return x_norm2 + y_norm2 - 2 * data @ data.T

	def radialkernel(self, data: np.ndarray, r: float) -> np.ndarray:
		"""
		Builds a binary (0/1) radial kernel: 1 if distance ≤ r, else 0.

		Args:
			data (ndarray): Array of shape (n, d).
			r (float): Radius threshold.

		Returns:
			ndarray: Adjacency-like matrix (n×n) of 0/1 floats.
		"""
		dist2 = self.efficientSquareDistance(data)
		return (dist2 <= r**2).astype(float)

	def gaussiankernel(self, data: np.ndarray, std: float) -> np.ndarray:
		"""
		Builds a Gaussian (RBF) kernel matrix.

		Args:
			data (ndarray): Array of shape (n, d).
			std (float): Standard deviation parameter for the Gaussian.

		Returns:
			ndarray: Kernel matrix of shape (n, n).
		"""
		dist2 = self.efficientSquareDistance(data)
		return np.exp(-dist2 / (2 * std**2))

	def setWeights(self, *c: Any) -> np.ndarray:
		"""
		Computes and normalizes the weight matrix on the original data.

		Args:
			*c: Parameters to pass into the currently set kernel function.

		Returns:
			ndarray: The normalized weight matrix (n×n).
		"""
		data_np = self.data.getNumpy()
		n = len(self.data)
		self.weights[:n, :n] = self.kernel(data_np, *c)
		self.normalizeWeights()
		return self.weights

	def normalizeWeights(self) -> None:
		"""
		Normalizes each row of the weight matrix to sum to 1.

		Raises:
			ValueError: If any row sums to zero, resulting in NaNs.
		"""
		self.weights = self.weights / self.weights.sum(axis=1, keepdims=True)
		if np.isnan(self.weights).any():
			raise ValueError("Array contains NaN values!")

	def setPartitionWeights(self, partition: Any, *c: Any) -> np.ndarray:
		"""
		Computes and normalizes weights based on cluster centers and sizes.

		Args:
			partition: An object with attributes `centers` (list of points)
					   and `point_counts` (counts per center).
			*c: Parameters to pass into the kernel function.

		Returns:
			ndarray: The normalized weight matrix for the partition block.
		"""
		centers = np.array(partition.centers)
		counts = np.array(partition.point_counts).reshape(-1, 1)
		K = self.kernel(centers[:, None], centers[None, :], *c)
		W = K * (counts @ counts.T)
		n = len(centers)
		self.weights[:n, :n] = W
		self.normalizeWeights()
		return self.weights

	def addUniversalGround(self, p_g: float = 0.01) -> np.ndarray:
		"""
		Adds (or updates) a 'universal ground' node connected uniformly to all others.

		Args:
			p_g (float): Total ground connection probability to distribute.

		Returns:
			ndarray: The updated normalized weight matrix including the ground node.
		"""
		if self.universalGround:
			n = self.weights.shape[0] - 1
			for x in range(n):
				self.weights[x, n] = p_g / n
				self.weights[n, x] = p_g / n
		else:
			self.universalGround = True
			n = self.weights.shape[0]
			newW = np.zeros([n + 1, n + 1])
			newW[:n, :n] = self.weights
			for x in range(n):
				newW[x, n] = p_g / n
				newW[n, x] = p_g / n
			self.weights = newW
			self.addLandmark(Landmark(n, 0))
		self.normalizeWeights()
		return self.weights

	def addLandmark(self, landmark: Landmark) -> None:
		"""
		Adds a single Landmark to the problem.

		Args:
			landmark (Landmark): The landmark instance to append.
		"""
		self.landmarks.append(landmark)

	def addLandmarks(self, landmarks: List[Landmark]) -> None:
		"""
		Adds multiple Landmark instances to the problem.

		Args:
			landmarks (List[Landmark]): List of landmarks to append.
		"""
		self.landmarks += landmarks

	def addLandmarksInRange(
		self, minRange: Union[List[float], np.ndarray],
		maxRange: Union[List[float], np.ndarray],
		voltage: float
	) -> List[Landmark]:
		"""
		Adds landmarks for all data points within a given coordinate range.

		Args:
			minRange (array-like): Minimum bounds per dimension.
			maxRange (array-like): Maximum bounds per dimension.
			voltage (float): Voltage to apply at each new landmark.

		Returns:
			List[Landmark]: The list of newly added landmarks.
		"""
		adding = []
		data_np = self.data.getNumpy()
		for idx, point in enumerate(data_np):
			if np.all(point >= minRange) and np.all(point <= maxRange):
				adding.append(Landmark(idx, voltage))
		self.addLandmarks(adding)
		return adding

__init__(data)

Initializes the Problem instance.

Parameters:

Name Type Description Default
data Any

An object containing your dataset. Must support len(data) and data.getNumpy() to return an (n, d) numpy array.

required
Source code in code\voltage.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(self, data: Any) -> None:
	"""
	Initializes the Problem instance.

	Args:
		data: An object containing your dataset. Must support len(data) 
			  and data.getNumpy() to return an (n, d) numpy array.
	"""
	super().__init__()
	self.data = data
	self.landmarks = []
	n = len(data)
	self.weights = np.zeros([n, n])
	self.universalGround = False

addLandmark(landmark)

Adds a single Landmark to the problem.

Parameters:

Name Type Description Default
landmark Landmark

The landmark instance to append.

required
Source code in code\voltage.py
244
245
246
247
248
249
250
251
def addLandmark(self, landmark: Landmark) -> None:
	"""
	Adds a single Landmark to the problem.

	Args:
		landmark (Landmark): The landmark instance to append.
	"""
	self.landmarks.append(landmark)

addLandmarks(landmarks)

Adds multiple Landmark instances to the problem.

Parameters:

Name Type Description Default
landmarks List[Landmark]

List of landmarks to append.

required
Source code in code\voltage.py
253
254
255
256
257
258
259
260
def addLandmarks(self, landmarks: List[Landmark]) -> None:
	"""
	Adds multiple Landmark instances to the problem.

	Args:
		landmarks (List[Landmark]): List of landmarks to append.
	"""
	self.landmarks += landmarks

addLandmarksInRange(minRange, maxRange, voltage)

Adds landmarks for all data points within a given coordinate range.

Parameters:

Name Type Description Default
minRange array - like

Minimum bounds per dimension.

required
maxRange array - like

Maximum bounds per dimension.

required
voltage float

Voltage to apply at each new landmark.

required

Returns:

Type Description
List[Landmark]

List[Landmark]: The list of newly added landmarks.

Source code in code\voltage.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def addLandmarksInRange(
	self, minRange: Union[List[float], np.ndarray],
	maxRange: Union[List[float], np.ndarray],
	voltage: float
) -> List[Landmark]:
	"""
	Adds landmarks for all data points within a given coordinate range.

	Args:
		minRange (array-like): Minimum bounds per dimension.
		maxRange (array-like): Maximum bounds per dimension.
		voltage (float): Voltage to apply at each new landmark.

	Returns:
		List[Landmark]: The list of newly added landmarks.
	"""
	adding = []
	data_np = self.data.getNumpy()
	for idx, point in enumerate(data_np):
		if np.all(point >= minRange) and np.all(point <= maxRange):
			adding.append(Landmark(idx, voltage))
	self.addLandmarks(adding)
	return adding

addUniversalGround(p_g=0.01)

Adds (or updates) a 'universal ground' node connected uniformly to all others.

Parameters:

Name Type Description Default
p_g float

Total ground connection probability to distribute.

0.01

Returns:

Name Type Description
ndarray ndarray

The updated normalized weight matrix including the ground node.

Source code in code\voltage.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def addUniversalGround(self, p_g: float = 0.01) -> np.ndarray:
	"""
	Adds (or updates) a 'universal ground' node connected uniformly to all others.

	Args:
		p_g (float): Total ground connection probability to distribute.

	Returns:
		ndarray: The updated normalized weight matrix including the ground node.
	"""
	if self.universalGround:
		n = self.weights.shape[0] - 1
		for x in range(n):
			self.weights[x, n] = p_g / n
			self.weights[n, x] = p_g / n
	else:
		self.universalGround = True
		n = self.weights.shape[0]
		newW = np.zeros([n + 1, n + 1])
		newW[:n, :n] = self.weights
		for x in range(n):
			newW[x, n] = p_g / n
			newW[n, x] = p_g / n
		self.weights = newW
		self.addLandmark(Landmark(n, 0))
	self.normalizeWeights()
	return self.weights

efficientSquareDistance(data)

Computes the pairwise squared Euclidean distances of the rows in data.

Uses the identity ‖x−y‖² = ‖x‖² + ‖y‖² − 2 x·y for efficiency.

Parameters:

Name Type Description Default
data ndarray

Array of shape (n, d).

required

Returns:

Name Type Description
ndarray ndarray

Matrix of shape (n, n) where entry (i, j) is squared distance.

Source code in code\voltage.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def efficientSquareDistance(self, data: np.ndarray) -> np.ndarray:
	"""
	Computes the pairwise squared Euclidean distances of the rows in `data`.

	Uses the identity ‖x−y‖² = ‖x‖² + ‖y‖² − 2 x·y for efficiency.

	Args:
		data (ndarray): Array of shape (n, d).

	Returns:
		ndarray: Matrix of shape (n, n) where entry (i, j) is squared distance.
	"""
	data_norm2 = np.sum(data**2, axis=1)
	x_norm2 = data_norm2.reshape(-1, 1)
	y_norm2 = data_norm2.reshape(1, -1)
	return x_norm2 + y_norm2 - 2 * data @ data.T

gaussiankernel(data, std)

Builds a Gaussian (RBF) kernel matrix.

Parameters:

Name Type Description Default
data ndarray

Array of shape (n, d).

required
std float

Standard deviation parameter for the Gaussian.

required

Returns:

Name Type Description
ndarray ndarray

Kernel matrix of shape (n, n).

Source code in code\voltage.py
154
155
156
157
158
159
160
161
162
163
164
165
166
def gaussiankernel(self, data: np.ndarray, std: float) -> np.ndarray:
	"""
	Builds a Gaussian (RBF) kernel matrix.

	Args:
		data (ndarray): Array of shape (n, d).
		std (float): Standard deviation parameter for the Gaussian.

	Returns:
		ndarray: Kernel matrix of shape (n, n).
	"""
	dist2 = self.efficientSquareDistance(data)
	return np.exp(-dist2 / (2 * std**2))

normalizeWeights()

Normalizes each row of the weight matrix to sum to 1.

Raises:

Type Description
ValueError

If any row sums to zero, resulting in NaNs.

Source code in code\voltage.py
184
185
186
187
188
189
190
191
192
193
def normalizeWeights(self) -> None:
	"""
	Normalizes each row of the weight matrix to sum to 1.

	Raises:
		ValueError: If any row sums to zero, resulting in NaNs.
	"""
	self.weights = self.weights / self.weights.sum(axis=1, keepdims=True)
	if np.isnan(self.weights).any():
		raise ValueError("Array contains NaN values!")

radialkernel(data, r)

Builds a binary (0/1) radial kernel: 1 if distance ≤ r, else 0.

Parameters:

Name Type Description Default
data ndarray

Array of shape (n, d).

required
r float

Radius threshold.

required

Returns:

Name Type Description
ndarray ndarray

Adjacency-like matrix (n×n) of 0/1 floats.

Source code in code\voltage.py
140
141
142
143
144
145
146
147
148
149
150
151
152
def radialkernel(self, data: np.ndarray, r: float) -> np.ndarray:
	"""
	Builds a binary (0/1) radial kernel: 1 if distance ≤ r, else 0.

	Args:
		data (ndarray): Array of shape (n, d).
		r (float): Radius threshold.

	Returns:
		ndarray: Adjacency-like matrix (n×n) of 0/1 floats.
	"""
	dist2 = self.efficientSquareDistance(data)
	return (dist2 <= r**2).astype(float)

setKernel(kernel)

Sets the kernel function to use for weight computations.

Parameters:

Name Type Description Default
kernel callable

A function or callable object with signature kernel(X, Y, *params) → ndarray of shape (|X|, |Y|).

required
Source code in code\voltage.py
113
114
115
116
117
118
119
120
121
def setKernel(self, kernel: Callable[..., np.ndarray]) -> None:
	"""
	Sets the kernel function to use for weight computations.

	Args:
		kernel (callable): A function or callable object with signature
						   kernel(X, Y, *params) → ndarray of shape (|X|, |Y|).
	"""
	self.kernel = kernel

setPartitionWeights(partition, *c)

Computes and normalizes weights based on cluster centers and sizes.

Parameters:

Name Type Description Default
partition Any

An object with attributes centers (list of points) and point_counts (counts per center).

required
*c Any

Parameters to pass into the kernel function.

()

Returns:

Name Type Description
ndarray ndarray

The normalized weight matrix for the partition block.

Source code in code\voltage.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def setPartitionWeights(self, partition: Any, *c: Any) -> np.ndarray:
	"""
	Computes and normalizes weights based on cluster centers and sizes.

	Args:
		partition: An object with attributes `centers` (list of points)
				   and `point_counts` (counts per center).
		*c: Parameters to pass into the kernel function.

	Returns:
		ndarray: The normalized weight matrix for the partition block.
	"""
	centers = np.array(partition.centers)
	counts = np.array(partition.point_counts).reshape(-1, 1)
	K = self.kernel(centers[:, None], centers[None, :], *c)
	W = K * (counts @ counts.T)
	n = len(centers)
	self.weights[:n, :n] = W
	self.normalizeWeights()
	return self.weights

setWeights(*c)

Computes and normalizes the weight matrix on the original data.

Parameters:

Name Type Description Default
*c Any

Parameters to pass into the currently set kernel function.

()

Returns:

Name Type Description
ndarray ndarray

The normalized weight matrix (n×n).

Source code in code\voltage.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def setWeights(self, *c: Any) -> np.ndarray:
	"""
	Computes and normalizes the weight matrix on the original data.

	Args:
		*c: Parameters to pass into the currently set kernel function.

	Returns:
		ndarray: The normalized weight matrix (n×n).
	"""
	data_np = self.data.getNumpy()
	n = len(self.data)
	self.weights[:n, :n] = self.kernel(data_np, *c)
	self.normalizeWeights()
	return self.weights

timeEnd(replace=True)

Computes the elapsed time since the last timeStart().

Parameters:

Name Type Description Default
replace bool

If True, resets the start time to now.

True

Returns:

Name Type Description
float float

Seconds elapsed since last start.

Source code in code\voltage.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def timeEnd(self, replace: bool = True) -> float:
	"""
	Computes the elapsed time since the last timeStart().

	Args:
		replace (bool): If True, resets the start time to now.

	Returns:
		float: Seconds elapsed since last start.
	"""
	cur_time = time.time()
	diff = cur_time - self.start
	if replace:
		self.start = cur_time
	return diff

timeStart()

Records the current time to measure elapsed intervals.

Source code in code\voltage.py
91
92
93
94
95
def timeStart(self) -> None:
	"""
	Records the current time to measure elapsed intervals.
	"""
	self.start = time.time()

Solver

Bases: DistanceBased

Solves a given Problem

Source code in code\voltage.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
class Solver(kmeans.DistanceBased):
	"""Solves a given Problem"""
	def __init__(self, problem):
		self.problem = problem
		super().__init__()

	def compute_voltages(self):
		n = self.problem.weights.shape[0]

		constrained_nodes =   [l.index for l in self.problem.landmarks]
		unconstrained_nodes = [i for i in range(n) if i not in constrained_nodes]

		b = np.zeros(n)
		for landmark in self.problem.landmarks:
			for y in range(0, n):
				b[y] += landmark.voltage * self.problem.weights[y][landmark.index]

		A_unconstrained = np.identity(len(unconstrained_nodes)) - self.problem.weights[np.ix_(unconstrained_nodes, unconstrained_nodes)]

		b_unconstrained = b[unconstrained_nodes]

		# print(self.problem.weights)
		# print(A_unconstrained)
		# print(b_unconstrained)

		v_unconstrained = solve(A_unconstrained, b_unconstrained)

		# print(v_unconstrained)

		self.voltages = np.zeros(n)

		for landmark in self.problem.landmarks:
			self.voltages[landmark.index] = landmark.voltage

		self.voltages[unconstrained_nodes] = v_unconstrained

		if (self.problem.universalGround):
			self.voltages = self.voltages[:-1]

		return self.voltages

	def approximate_voltages(self, epsilon=None, max_iters=None):
		n = self.problem.weights.shape[0]

		if (epsilon == None):
			if (max_iters == None):
				epsilon = 1 / n

		constrained_nodes =		[l.index for l in self.problem.landmarks]
		constraints = 			[l.voltage for l in self.problem.landmarks]
		unconstrained_nodes =	[i for i in range(n) if i not in constrained_nodes]

		self.voltages = np.zeros(n)
		voltages = np.zeros(n)

		for landmark in self.problem.landmarks:
			self.voltages[landmark.index] = landmark.voltage

		dist = self.distance(self.voltages, voltages)
		prev_dist = float('inf')

		iterations = 0

		while (((epsilon != None and dist > epsilon * len(self.problem.data)) or (max_iters != None and iterations < max_iters)) and dist < prev_dist):
			voltages = np.matmul(self.problem.weights, self.voltages)
			voltages[constrained_nodes] = constraints
			prev_dist = dist
			dist = self.distance(self.voltages, voltages)

			# print(prev_dist, dist)

			self.voltages = voltages
			iterations += 1

		# print(iterations)

		if (self.problem.universalGround):
			self.voltages = self.voltages[:-1]

		return self.voltages

	def localSolver(self, partitions, c):
		voltages = [0 for i in range(len(self.problem.data))]

		for index in range(partitions.k):
			closestIndicies = partitions.getClosestPoints(index)
			closeproblem.LandmarksIndicies = []

			for pair in partitions.voronoi.ridge_points:
				if pair[0] == index:
					closeproblem.LandmarksIndicies.append(pair[1])
				if pair[1] == index:
					closeproblem.LandmarksIndicies.append(pair[0])

			closeproblem.Landmarks = []
			for cli in closeproblem.LandmarksIndicies:
				closeproblem.Landmarks.append(Landmark(cli, self.voltages[cli]))

			localSolver = Solver(self.problem.data.getSubSet(closestIndicies))
			localSolver.setKernel(self.problem.gaussiankernel)
			localSolver.setWeights(c)
			localSolver.addproblem.Landmarks(closeproblem.Landmarks)
			localVoltages = localSolver.compute_voltages()

			for i, v in zip(closestIndicies, localVoltages):
				voltages[i] = v

		return voltages