Source code for deep_time_series.chunk
import numpy as np
import torch
import xarray as xr
[docs]class BaseChunkSpec:
PREFIX = ''
def __init__(self, tag, names, range_, dtype):
self.tag = tag
self.names = names
self.range = range_
self.dtype = dtype
@property
def tag(self) -> str:
if self._tag is None:
raise NotImplementedError(f'Define {self.__class__._name__}.tag')
return self._tag
@tag.setter
def tag(self, value: str):
if not isinstance(value, str):
raise TypeError(f'Invalid type for "tag": {type(value)}')
if not value.startswith(self.PREFIX):
value = f'{self.PREFIX}.{value}'
self._tag = value
@property
def names(self) -> list[str]:
if self._names is None:
raise NotImplementedError(f'Define {self.__class__._name__}.names')
return self._names
@names.setter
def names(self, value):
if not isinstance(value, (list, tuple)):
raise TypeError('Invalid type for "names"')
if not all((isinstance(name, str) for name in value)):
raise TypeError('Invalid type for "names"')
self._names = list(value)
@property
def range(self) -> tuple[int, int]:
if self._range is None:
raise NotImplementedError(f'Define {self.__class__._name__}.range')
return self._range
@range.setter
def range(self, value: tuple[int, int]):
if not isinstance(value, (tuple, list)):
raise TypeError(f'Invalid type for "range": {type(value)}')
if not isinstance(value[0], int):
raise TypeError(f'Invalid type for "range[0]": {type(value)}')
if not isinstance(value[1], int):
raise TypeError(f'Invalid type for "range[1]": {type(value)}')
if value[0] >= value[1]:
raise ValueError('range[0] >= range[1]')
self._range = tuple(value)
@property
def dtype(self) -> np.dtype:
if self._dtype is None:
raise NotImplementedError(f'Define {self.__class__._name__}.dtype')
return self._dtype
@dtype.setter
def dtype(self, value: np.dtype):
if not isinstance(np.dtype(value), np.dtype):
raise TypeError(f'Invalid type for "dtype": {type(value)}')
self._dtype = np.dtype(value)
[docs]class EncodingChunkSpec(BaseChunkSpec):
PREFIX = 'encoding'
[docs]class DecodingChunkSpec(BaseChunkSpec):
PREFIX = 'decoding'
[docs]class LabelChunkSpec(BaseChunkSpec):
PREFIX = 'label'
[docs]class ChunkInverter:
def __init__(self, chunk_specs: list[BaseChunkSpec]):
"""Class to convert tensor to DataFrame."""
self.chunk_specs = chunk_specs
# Core tag to names dict.
self.core_tag_dict = {}
for spec in chunk_specs:
core_tag = spec.tag.split('.')[1]
if core_tag not in self.core_tag_dict:
self.core_tag_dict[core_tag] = spec.names
else:
a = np.array(self.core_tag_dict[core_tag])
b = np.array(spec.names)
assert np.all(a == b)
self.tag_dict = {spec.tag: spec.names for spec in chunk_specs}
def _convert_to_numpy(self, tensor: torch.Tensor | np.ndarray):
if isinstance(tensor, np.ndarray):
return tensor
elif isinstance(tensor, torch.Tensor):
return tensor.cpu().numpy()
else:
raise TypeError(f'Invalid type for tensor {type(tensor)}')
def _infer_shape(self, tensor: np.ndarray):
pass
def invert(self, tag: str, tensor: torch.Tensor | np.ndarray):
if tag in self.tag_dict:
names = self.tag_dict[tag]
elif tag in self.core_tag_dict:
names = self.core_tag_dict[tag]
elif tag.split('.')[1] in self.core_tag_dict:
names = self.core_tag_dict[tag.split('.')[1]]
else:
raise ValueError(f'"{tag}" not in chunk_specs.')
data = self._convert_to_numpy(tensor)
da = xr.DataArray(
data=data,
dims=('batch_index', 'time_index', 'feature'),
name='',
)
# TODO: support for multi-dimensional features (like video).
df = da.to_dataframe().unstack(2)
df.columns = names
return df
def invert_dict(
self,
data: dict[str, torch.Tensor | np.ndarray],
):
outputs = {}
for tag, tensor in data.items():
print(tag)
outputs[tag] = self.invert(tag, tensor)
return outputs