Skip to content

Commit

Permalink
chore: remove useless code and rename some struct
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Aug 22, 2024
1 parent 463e2f5 commit d859e74
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 338 deletions.
243 changes: 25 additions & 218 deletions src/pipeline/src/etl/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,71 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::ops::Deref;
use std::str::FromStr;

use ahash::{HashSet, HashSetExt};
use itertools::Itertools;

#[derive(Debug, Default, Clone)]
pub struct Fields(Vec<Field>);

impl Fields {
pub(crate) fn new(fields: Vec<Field>) -> Result<Self, String> {
let ff = Fields(fields);
ff.check()
}

pub(crate) fn one(field: Field) -> Self {
Fields(vec![field])
}

pub(crate) fn get_target_fields(&self) -> Vec<&str> {
self.0.iter().map(|f| f.get_target_field()).collect()
}

fn check(self) -> Result<Self, String> {
if self.0.is_empty() {
return Err("fields must not be empty".to_string());
}

let mut set = HashSet::new();
for f in self.0.iter() {
if set.contains(&f.input_field.name) {
return Err(format!(
"field name must be unique, but got duplicated: {}",
f.input_field.name
));
}
set.insert(&f.input_field.name);
}

Ok(self)
}
}

impl std::fmt::Display for Fields {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = self.0.iter().map(|f| f.to_string()).join(";");
write!(f, "{s}")
}
}

impl std::ops::Deref for Fields {
type Target = Vec<Field>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for Fields {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

enum IndexInfo {
Index(usize),
NotSet,
Expand Down Expand Up @@ -179,12 +116,12 @@ impl OneInputMultiOutputField {
}

#[derive(Debug, Default, Clone)]
pub struct NewField {
pub struct Field {
pub(crate) input_field: String,
pub(crate) target_field: Option<String>,
}

impl FromStr for NewField {
impl FromStr for Field {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Expand All @@ -196,16 +133,20 @@ impl FromStr for NewField {
.to_string();
let target_field = parts.next().map(|x| x.trim().to_string());

Ok(NewField {
if input_field.is_empty() {
return Err("input field is empty".to_string());
}

Ok(Field {
input_field,
target_field,
})
}
}

impl NewField {
impl Field {
pub(crate) fn new(input_field: impl Into<String>, target_field: Option<String>) -> Self {
NewField {
Field {
input_field: input_field.into(),
target_field,
}
Expand All @@ -225,148 +166,35 @@ impl NewField {
}

#[derive(Debug, Default, Clone)]
pub struct NewFields(Vec<NewField>);
pub struct Fields(Vec<Field>);

impl NewFields {
pub(crate) fn new(fields: Vec<NewField>) -> Self {
NewFields(fields)
impl Fields {
pub(crate) fn new(fields: Vec<Field>) -> Self {
Fields(fields)
}

pub(crate) fn one(field: NewField) -> Self {
NewFields(vec![field])
pub(crate) fn one(field: Field) -> Self {
Fields(vec![field])
}
}

impl Deref for NewFields {
type Target = Vec<NewField>;
impl Deref for Fields {
type Target = Vec<Field>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl IntoIterator for NewFields {
type Item = NewField;
type IntoIter = std::vec::IntoIter<NewField>;
impl IntoIterator for Fields {
type Item = Field;
type IntoIter = std::vec::IntoIter<Field>;

fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

/// Used to represent the input and output fields of a processor or transform.
#[derive(Debug, Default, Clone)]
pub struct Field {
/// The input field name and index.
pub input_field: InputFieldInfo,

/// The output field name and index mapping.
pub output_fields_index_mapping: BTreeMap<String, usize>,

// rename
pub target_field: Option<String>,

// 1-to-many mapping
// processors:
// - csv
pub target_fields: Option<Vec<String>>,
}

impl Field {
pub(crate) fn new(field: impl Into<String>) -> Self {
Field {
input_field: InputFieldInfo::name(field.into()),
output_fields_index_mapping: BTreeMap::new(),
target_field: None,
target_fields: None,
}
}

/// target column_name in processor or transform
/// if target_field is None, return input field name
pub(crate) fn get_target_field(&self) -> &str {
self.target_field
.as_deref()
.unwrap_or(&self.input_field.name)
}

/// input column_name in processor or transform
pub(crate) fn get_field_name(&self) -> &str {
&self.input_field.name
}

/// set input column index in processor or transform
pub(crate) fn set_input_index(&mut self, index: usize) {
self.input_field.index = index;
}

pub(crate) fn set_output_index(&mut self, key: &str, index: usize) {
if let Some(v) = self.output_fields_index_mapping.get_mut(key) {
*v = index;
}
}

pub(crate) fn insert_output_index(&mut self, key: String, index: usize) {
self.output_fields_index_mapping.insert(key, index);
}
}

impl std::str::FromStr for Field {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.split(',');
let field = parts.next().ok_or("field is missing")?.trim().to_string();

if field.is_empty() {
return Err("field is empty".to_string());
}

let renamed_field = match parts.next() {
Some(s) if !s.trim().is_empty() => Some(s.trim().to_string()),
_ => None,
};

// TODO(qtang): ???? what's this?
// weird design? field: <field>,<target_field>,<target_fields>,<target_fields>....
// and only use in csv processor
let fields: Vec<_> = parts
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect();
let target_fields = if fields.is_empty() {
None
} else {
Some(fields)
};

Ok(Field {
input_field: InputFieldInfo::name(field),
output_fields_index_mapping: BTreeMap::new(),
target_field: renamed_field,
target_fields,
})
}
}

impl std::fmt::Display for Field {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match (&self.target_field, &self.target_fields) {
(Some(target_field), None) => write!(f, "{}, {target_field}", self.input_field.name),
(None, Some(target_fields)) => {
write!(
f,
"{}, {}",
self.input_field.name,
target_fields.iter().join(",")
)
}
_ => write!(f, "{}", self.input_field.name),
}
}
}

#[cfg(test)]
mod tests {
use crate::etl::field::Field;
Expand All @@ -384,35 +212,14 @@ mod tests {

let cases = [
// ("field", "field", None, None),
(
"field, target_field",
"field",
Some("target_field".into()),
None,
),
(
"field, target_field1, target_field2, target_field3",
"field",
Some("target_field1".into()),
Some(vec!["target_field2".into(), "target_field3".into()]),
),
(
"field,, target_field1, target_field2, target_field3",
"field",
None,
Some(vec![
"target_field1".into(),
"target_field2".into(),
"target_field3".into(),
]),
),
("field, target_field", "field", Some("target_field")),
("field", "field", None),
];

for (s, field, target_field, target_fields) in cases.into_iter() {
for (s, field, target_field) in cases.into_iter() {
let f: Field = s.parse().unwrap();
assert_eq!(f.get_field_name(), field, "{s}");
assert_eq!(f.target_field, target_field, "{s}");
assert_eq!(f.target_fields, target_fields, "{s}");
assert_eq!(f.input_field(), field, "{s}");
assert_eq!(f.target_field(), target_field, "{s}");
}
}
}
26 changes: 4 additions & 22 deletions src/pipeline/src/etl/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ use regex::{RegexProcessor, RegexProcessorBuilder};
use timestamp::{TimestampProcessor, TimestampProcessorBuilder};
use urlencoding::{UrlEncodingProcessor, UrlEncodingProcessorBuilder};

use super::field::{NewField, NewFields};
use crate::etl::field::{Field, Fields};
use super::field::{Field, Fields};
use crate::etl::value::Value;

const FIELD_NAME: &str = "field";
Expand Down Expand Up @@ -308,27 +307,10 @@ where
})
}

pub(crate) fn yaml_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields, String> {
let v = yaml_parse_strings(v, field)?;
Fields::new(v)
pub(crate) fn yaml_new_fileds(v: &yaml_rust::Yaml, field: &str) -> Result<Fields, String> {
yaml_parse_strings(v, field).map(Fields::new)
}

pub(crate) fn yaml_new_fileds(v: &yaml_rust::Yaml, field: &str) -> Result<NewFields, String> {
yaml_parse_strings(v, field).map(NewFields::new)
}

pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<NewField, String> {
yaml_parse_string(v, field)
}

pub(crate) fn yaml_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field, String> {
pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field, String> {
yaml_parse_string(v, field)
}

pub(crate) fn update_one_one_output_keys(fields: &mut Fields) {
for field in fields.iter_mut() {
field
.output_fields_index_mapping
.insert(field.get_target_field().to_string(), 0_usize);
}
}
Loading

0 comments on commit d859e74

Please sign in to comment.