Source code for omfit_classes.omfit_timcon
try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise
from omfit_classes.omfit_ascii import OMFITascii
from scipy import signal
__all__ = ['OMFITtimcon']
[docs]class OMFITtimcon(SortedDict, OMFITascii):
    """
    OMFIT class for parsing/writing the NBI time traces of the DIII-D PCS
    """
    def __init__(self, filename):
        OMFITascii.__init__(self, filename)
        SortedDict.__init__(self)
        self.dynaLoad = True
[docs]    @dynaLoad
    def load(self):
        with open(self.filename, 'r') as _f:
            lines = _f.read().split('\n')
        nummod = int(lines[0].split('=')[1])
        numphases = int(lines[1].split('=')[1])
        k = 2
        mode_lookup = {0: 'NoMod', 1: 'PCS', 2: 'TIMCON', 3: 'CALORIMETER', 4: r'PCS/TIMCON'}
        states_lookup = {
            0: 'PRI:AnyGen',
            1: 'PRI:LikeSub',
            2: 'PRI:NoSub',
            3: 'PRI:SpecialA',
            4: 'PRI:SpecialB',
            5: 'PRI:SpecialC',
            6: 'SEC:Gen/Like',
            7: 'SEC:SpecialA',
            8: 'SEC:SpecialB',
            9: 'SEC:SpecialC',
        }
        for mod in range(nummod):
            tmp = lines[k].split('-')
            modname = tmp[0].strip().split()[1]
            self[modname] = SortedDict()
            tmp = tmp[1].split()
            self[modname]['enabled'] = bool(int(tmp[0]))
            self[modname]['total_on_time'] = float(tmp[1])
            self[modname]['mode'] = int(tmp[2])
            self[modname]['mode_txt'] = mode_lookup[self[modname]['mode']]
            self[modname]['beam_status'] = int(tmp[3])
            self[modname]['beam_status_txt'] = states_lookup[self[modname]['beam_status']]
            self[modname]['sub_beam'] = self[modname]['mode_txt'] == 'PCS' and self[modname]['beam_status_txt'].find('SEC') == 0
            self[modname]['sub_priority'] = int(tmp[4])
            self[modname]['__unknown__'] = int(tmp[5])
            k += 1
            for phase in range(1, numphases + 1):
                tmp = list(map(eval, lines[k].split(':')[1].split()))
                self[modname]['PHASE%d' % phase] = SortedDict()
                for k1, item in enumerate(['__enabled__', 'start', 'on', 'off', 'duration', 'pulses']):
                    self[modname]['PHASE%d' % phase][item] = tmp[k1]
                # note the timcon files generated by timcon always have the enabled phase switched turned on (that's why we hide it here)
                self[modname]['PHASE%d' % phase]['__enabled__'] = bool(self[modname]['PHASE%d' % phase]['__enabled__'])
                k += 1
[docs]    @dynaSave
    def save(self):
        lines = []
        lines.append('nummod = %d' % len(self))
        numphases = 100
        for modname in self:
            numphases = min([numphases, len([phase for phase in self[modname] if phase.startswith('PHASE')])])
        lines.append('numphases = %d' % numphases)
        for kmod, modname in enumerate(self):
            lines.append(
                (
                    'SRC%d %s - {enabled:d} {total_on_time:0.6f} {mode:d} {beam_status:d} {sub_priority:d} {__unknown__:d}'
                    % (kmod + 1, modname)
                ).format(**self[modname])
            )
            for phase in self[modname]:
                if not phase.startswith('PHASE'):
                    continue
                lines.append(
                    (
                        '%s: %d %d %d %d %d %0.6f'
                        % (
                            phase,
                            self[modname][phase]['__enabled__'],
                            self[modname][phase]['start'],
                            self[modname][phase]['on'],
                            self[modname][phase]['off'],
                            self[modname][phase]['duration'],
                            self[modname][phase]['pulses'],
                        )
                    )
                )
        with open(self.filename, 'w') as f:
            f.write('\n'.join(lines) + '\n')
[docs]    @dynaLoad
    def waveform(self, beam, x, y=None, phases=None, duty_cycle_1=False):
        if phases is None:
            phases = [phase for phase in self[beam] if phase.startswith('PHASE')]
        else:
            phases = tolist(phases)
        if y is None:
            y = x * 0
        starts = np.array([self[beam][phase]['start'] for phase in phases])
        durations = np.array([self[beam][phase]['duration'] for phase in phases])
        ons = np.array([self[beam][phase]['on'] for phase in phases])
        ons *= np.array([self[beam]['enabled'] * self[beam][phase]['__enabled__'] for phase in phases])
        offs = np.array([self[beam][phase]['off'] for phase in phases])
        for k in range(len(phases)):
            x0 = starts[k]
            dur = durations[k]
            on1 = ons[k]
            off1 = offs[k]
            if on1 == 0:
                # abort if no on time
                continue
            # calculations
            x1 = x0 + dur
            if duty_cycle_1:
                period = 100
                duty = 1.0
            else:
                period = on1 + off1
                duty = on1 / float(period)
            # form the output
            w = np.where((x > x0) * (x < x1))
            tt = (x[w] - x0) * 2 * np.pi / period
            y[w] = (signal.square(tt, duty=duty) + 1.0) / 2.0
        return x, y
[docs]    @dynaLoad
    def plot_waveform(self, beam, ax=None, show_legend=True):
        x = np.linspace(0, 10000, 10000)
        if ax is None:
            ax = pyplot.gca()
        x, y = self.waveform(beam, x)
        color_blind_color_cycle = [
            '#1f77b4',
            '#ff7f0e',
            '#2ca02c',
            '#d62728',
            '#9467bd',
            '#8c564b',
            '#e377c2',
            '#7f7f7f',
            '#bcbd22',
            '#17becf',
        ]
        label = f"{self[beam]['mode_txt']} {self[beam]['beam_status_txt']}"
        ax.plot(x, y, color=color_blind_color_cycle[self[beam]['mode']], label=label)
        ax.set_ylim([0, 2])
        ax.set_ylabel(beam)
        for kphase, phase in enumerate(self[beam]):
            if not phase.startswith('PHASE'):
                continue
            ax.axvspan(
                self[beam][phase]['start'],
                self[beam][phase]['start'] + self[beam][phase]['duration'],
                color=['lightcoral', 'turquoise'][np.mod(kphase, 2)],
                alpha=0.15,
                linewidth=0,
            )
        ax.set_yticks([0, 1])
        ax.set_xlabel('Time (ms)')
        if show_legend:
            ax.legend(loc=0, frameon=False, text_same_color=True, hide_markers=True, fontsize='small')
        return x, y
[docs]    @dynaLoad
    def plot(self):
        x = np.linspace(0, 10000, 10000)
        y = x * 0
        nrow = len(self) + 1
        # what magic is making this method create a new figure?
        # it is not transparent how I a) pass an existing figure for over-plotting, b) specify the size :(
        fig = pyplot.gcf()
        fig.set_size_inches(8, 9)
        ax = pyplot.subplot(nrow, 1, 1)
        ax.set_xlim(x[0], x[-1])  # have to get the x10^4 label in now so the next axes hide it
        ax.set_ylabel('ALL')
        ax.set_ylim(0, len(self) + 2)
        for i, beam in enumerate(self):
            ai = pyplot.subplot(nrow, 1, i + 2, sharex=ax)
            x, y0 = self.plot_waveform(beam, ax=ai)
            y += y0
        ax.plot(x, y, 'k')
        autofmt_sharex(hspace=0.1)