Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function for getting current wallclock time #687

Open
hazelnut-99 opened this issue Jul 15, 2024 · 4 comments
Open

Add function for getting current wallclock time #687

hazelnut-99 opened this issue Jul 15, 2024 · 4 comments
Labels
enhancement New feature or request good first issue Good for newcomers

Comments

@hazelnut-99
Copy link

Hi!
I am trying to add a processing time timestamp to each row with the following query:

select now() as current_ts, "timestampMillis" as original_ts, some_other_column
from my_table

However, it seems that the now() function is executed only once, resulting in a static current_ts field that does not update over time.

Are there any keywords or functions that can generate a real-time timestamp for each record in the output?

@mwylde mwylde added the enhancement New feature or request label Jul 17, 2024
@mwylde
Copy link
Member

mwylde commented Jul 17, 2024

Thanks for reporting. There are at least four possible definitions for "current time" in arroyo

  1. The time the query is started (which is now() in Arroyo)
  2. The current clock time
  3. The event time of the row it's called on
  4. The current watermark time

Currently arroyo just implements the first one (via now()), but others would be useful as well. The second one is easy to get in a UDF today, but the other two would need to be built in.

In Flink, now() is the current clock time (2) in streaming mode, but query start time (1) in batch mode. We might want to adopt that behavior, but I think the current version of now() is also useful in some cases and is worth preserving. Adding functions for 3 and 4 would also be great (in Flink 4 is CURRENT_WATERMARK which seems like a fine name).

Writing new functions is pretty straightforward so this could be a great first issue for someone.

@mwylde mwylde added the good first issue Good for newcomers label Jul 17, 2024
@mwylde mwylde changed the title Add current unix timestamp to sink Add function for getting current wallclock time Jul 17, 2024
@kpe
Copy link

kpe commented Oct 16, 2024

I'm interested in obtaining the current event time of a row (i.e. 3 above). Tried to look quickly how this could be implemented, but couldn't find a similar function?
(I actually was not able to find the implementation of any of the standard functions in the source code I just found extract_json_string)

@mwylde - can you give me some guidance, where to look at?

Can something like row_time() be implemented as an UDF? Is there a UDF type that has access to the row "attributes" like the event_time?
Or should the _timestamp field, I see in the source code, just be somehow made accessible over SQL?

@mwylde
Copy link
Member

mwylde commented Oct 16, 2024

Hey @kpe — happy to help walk through how you'd implement this. row_time() (i.e., 3 in the list above) can't be implemented as a UDF. This is because the expression trees are planned from SQL with DataFusion, which doesn't know about the _timestamp field as its not actually part of the SQL schema. We add it in as part of various rewrites of the plan (you can find the various calls to add_timestamp_field) throughout arroyo-planner).

So instead we would need to define a placeholder UDF, like hop/tumble/session:

functions.insert(
"hop".to_string(),
Arc::new(create_udf(

This would then need to be rewritten (by traversing through the logical plan and all expressions) into an expression that gets the _timestamp field.

@prakkashm
Copy link

prakkashm commented Dec 5, 2024

We have implemented a rust UDF for it. It might seem hacky but works for us and serves our use cases.

This is for arroyo v0.12.0

/*
[dependencies]
chrono = "0.4"
*/

use arroyo_udf_plugin::udf;
use chrono::{Utc, DateTime, SecondsFormat};

#[udf]
fn current_timestamp_utc(dummy_arg: &str) -> Option<String> {
    // `dummy_arg` is not needed; arroyo bug hence we need to add an argument to this function
    let utc_ts: DateTime<Utc> = Utc::now();
    Some(utc_ts.to_rfc3339_opts(SecondsFormat::Millis, true))
}

The corresponding SQL usage: current_timestamp_utc(event_ts) AS arroyo_ingestion_ts

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

4 participants