Поделиться через


Изменения отметок времени

В примерах из этого раздела показано использование операторов для изменения отметки времени события. Путем изменения отметки времени для события можно изменить воздействие событий на последующие операторы, такие как окна и соединения, агрегатные операции с окнами и т. д. Эти функции доступны через следующие методы расширения LINQ.

Сдвиг времени события

Оператор ShiftEventTime() изменяет время начала каждого события в потоке в соответствии с указанным выражением.

В следующем примере время каждого события в потоке сдвигается вперед на 15 минут.

// shift events by 15 minutes into the future.
var shifted = inputStream.ShiftEventTime(e => TimeSpan.FromMinutes(15)); 

В следующем примере время каждого события в потоке сдвигается назад на 1 час.

// shift events by 1 hour into the past.
var shifted = inputStream.ShiftEventTime(e => TimeSpan.FromHours(-1));

Выражение для указания сдвига времени может ссылаться на текущее время начала события, но не на время окончания и не на полезные данные. Сдвиг не изменяет время существования события и его полезные данные.

Значение DateTime.MinValue представляет значение времени, соответствующее минус бесконечности. Если время начала события имеет это значение и на него ссылается указанное выражение (а не константа), то выражение не вычисляется и время начала остается равным DateTime.MinValue. В противном случае выражение будет вычислено во время выполнения, что, тем не менее, может вызвать исключение переполнения.

Обратите внимание, что указанный сдвиг времени также применяется к событиям CTI, обрабатываемым этим оператором, поскольку метод ShiftEventTime влияет на время начала всех событий в потоке.

Изменение продолжительности события

Оператор AlterEventDuration() изменяет время существования события. Время существования события задает интервал времени, в течение которого событие является допустимым. Продолжительность определяется в качестве функции для события, ее можно вычислить по начальному времени, конечному времени или полезным данным события.

В следующем примере задается продолжительность события в 1 час.

// set event duration to 1 hour.
var onehour = inputStream.AlterEventDuration(e => TimeSpan.FromHours(1));

В следующем примере для события задается продолжительность, вдвое превышающая текущее время существования события.

// double event duration. 
var doubled = inputStream.AlterEventDuration(e => (e.EndTime - e.StartTime) * 2);

Значение DateTime.MaxValue представляет значение времени, соответствующее плюс бесконечности. Если время окончания события имеет это значение и на него ссылается указанное выражение, выражение не вычисляется и время окончания остается равным DateTime.MaxValue.

Одновременный сдвиг события и изменение продолжительности

Оператор AlterEventLifetime() сочетает функции AlterEventDuration и ShiftEventTime, обеспечивая максимально широкие возможности.

Первый параметр метода AlterEventLifeTime() задает новую отметку времени начала и может указывать на время начала текущего события. Этот параметр должен указываться как время в формате UTC. Второй параметр задает новое время существования и может указывать на время начала, время окончания и поля полезных данных текущего события.

В следующем примере время события сдвигается на 1 минуту назад, однако его конечное время не изменяется (что означает, что к его времени существования добавляется одна минута). Новое время существования указывается во втором параметре.

// shift event 1 minute into the past, but leave the end time (event duration) unchanged.
var newStream = inputStream.AlterEventLifetime(e => e.StartTime - TimeSpan.FromMinutes(1),
                                               e => e.EndTime - e.StartTime + TimeSpan.FromMinutes(1));]

Обратите внимание, что сдвиг времени начала применяется к событиям CTI, обрабатываемым этим оператором.

См. также примечания, относящиеся к значениям DateTime.MinValue и DateTime.MaxValue ранее в этом разделе.

Преобразование потока в поток точечных событий

Оператор ToPointEventStream является удобным средством для преобразования граничных и интервальных событий в точечные (путем изменения времени существования событий на один такт с начала события), как показано в следующем примере.

var pointStream = inputStream.ToPointEventStream();

При преобразовании интервальных событий в точечные сохраняется только время начала точечных событий.

Изменение продолжительности события

Оператор ClipEventDuration принимает два потока в качестве параметров и изменяет время существования каждого события в первом потоке в соответствии с временем запуска следующего события совпадения во втором потоке.

До сих пор использовались операторы, которые позволяют изменять время существования события на фиксированный промежуток времени. Оператор ClipEventDuration обеспечивает очень гибкий способ изменения времени существования событий по отношению к другим событиям. В целом, этот оператор указывается на одном потоке и принимает другой поток в качестве параметра в месте с условием совпадения. Оператор изменяет время существования каждого события в первом потоке на время запуска «следующего» события (в плане времени применения) в другом потоке, который выполняет условие совпадения.

В качестве примера представим два потока, stream1 и stream2, которые содержат события с полем полезных данных «Идентификатор». Следующая инструкция изменяет все события в stream1 на следующее событие в stream2, имеющее такое же значение поля «Идентификатор»:

var clipped = stream1.ClipEventDuration(stream2, (e1, e2) => e1.Id == e2.Id);

Условие совпадения приводится в качестве выражения над обоими полями с полезными данными. Семантика этой инструкции показана на следующей схеме.

Семантика ClipEventDuration

На диаграмме показано, как первое событие в stream1 с идентификатором, равным А, изменяется на следующее событие с идентификатором, равным А, в stream2. Другое событие в stream1, где идентификатор равен В, не изменяется, поскольку следующее событие совпадения в stream2 имеет место только после завершения события в stream1.

Такое поведение изменения определяет большое количество способов применения. Одно общее требование, которое должно быть выполнено, заключается в преобразовании потока точек в поток непрерывных интервалов, который также называется «сигнал».

Указать на преобразование сигнала

В этом случае сначала необходимо расширить все точечные события так, чтобы они достигали следующего события. Другими словами, необходимо применить время ожидания, которое определяет, насколько долго событие должно длиться до того, как произойдет следующее событие. Время ожидания может быть конечным или бесконечным промежутком времени. Предположим, время ожидания составляет 60 секунд.

var extended = input.AlterEventDuration(e => TimeSpan.FromSeconds(60));

С такой настройкой можно использовать операторClipEventDuration, а поток использовать в качестве параметра. В результате каждое событие будет изменено на следующее в одном потоке, создавая таким образом непрерывную серию интервальных событий. Поскольку время начала второго потока имеет значение для операции изменения, можно также использовать исходный точечный поток.

var signal = extended.ClipEventDuration(input, (e1, e2) => true);

В данном случае результат условия совпадения всегда принимает значение true, при этом выполняется поиск в одном логическом потоке, т. е. все события в потоке связаны с одним источником данных.

На следующей диаграмме показан результат преобразования «точка-сигнал» посредством оператора ClipEventDuration.

Указание на преобразование сигнала с помощью ClipEventDuration

Обе инструкции LINQ могут быть объединены в одну инструкцию.

var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => true);

Если поток содержит несколько логических потоков (например, измерения из нескольких устройств или значения нескольких бирж), соответствующий ключ (идентификатор устройства или символ биржи) должен быть найден в логическом выражении.

var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => e1.Symbol == e2.Symbol);

Создание сеанса

В другом случае для ClipEventDuration используется создание событий сеанса для того, чтобы добавить примечания к событиям, которые произошли во время сеанса. Предположим, используется следующая схема событий, которая описывает события взаимодействия пользователей.

public class EventType
{
    public int UserId;
    public string Type;
    public DateTime Time;
    public byte[] data;
};

В этом примере поле полезных данных Type может содержать значения «начало», «конец» или «другое» и описывать начало пользовательского сеанса, конец сеанса и пользовательские события во время сеанса соответственно. Поле Time содержит отметку времени взаимодействия, а поле data — другие сведения. Задача заключается в аннотировании каждого события и добавлении времени начала сеанса, в котором имело место данное событие. Далее предположим, что время ожидания каждого сеанса составляет 10 минут.

На следующей диаграмме показан пример ряда событий при таком сценарии.

Создание событий сеанса с помощью ClipEventDuration

Во-первых, увеличение длительности в соответствии с временем ожидания применяется ко всем событиями типа «начало»:

var sessionStarts = from e in input
                    where e.Type == “start”
                    select e;
var sessionStartsExt = sessionStarts.AlterEventDuration(e => TimeSpan.FromMinutes(10));

Далее события сеанса необходимо усечь до их соответствующих окончаний по каждому пользовательскому идентификатору.

var sessionEnds = from e in input
                  where e.Type == “end”
                  select e;
var sessions = sessionStartsExt.ClipEventDuration(sessionEnds, (e1, e2) => e1.UserId == e2.UserId);

Эти инструкции приведены на следующей диаграмме:

Изменение продолжительности сеанса с помощью ClipEventDuration

Теперь события сеанса могут быть добавлены к оставшимся событиям:

var sessionActivity = from e in input
                      where e.Type == “other”
                      select e;
var annotated = from s1 in sessions
                join s2 in sessionActivity
                on s1.UserId equals s2.UserId
                select new {
                    s2.UserId,
                    s2.Type,
                    s2.Time,
                    s2.Data,
                    SessionStart = s1.Time
                }

В операции соединения можно ссылаться на события sessionActivity, а также на поля события сеанса, что позволяет собрать аннотированное событие sessionActivity и добавить время начала сеанса к каждому событию sessionActivity:

Соединение событий сеанса с другими событиями

Поскольку условие соединения заключается в равенстве параметров UserId, то событие UserId, равное Y в sessionActivity, не учитывается для данного конкретного сеанса, в котором параметр UserId равен Х.

Инструкции LINQ могут быть представлены в более краткой форме:

var sessions = input
                 .Where(e => e.Type == “start”)
                 .AlterEventDuration(e => TimeSpan.FromMinutes(10))
                 .ClipEventDuration(input.Where(e => e.Type == “end”), (e1, e2) => e1.UserId == e2.UserId);
var annotated = from s1 in sessions
                join s2 in input.Where(e => e.Type == “other”)
                on s1.UserId equals s2.UserId
                select new {
                    s2.UserId,
                    s2.Type,
                    s2.Time,
                    s2.Data,
                    SessionStart = s1.Time
                }

См. также

Основные понятия

Основные понятия сервера служб StreamInsight